Spark 2.0 on HDP

2016-10-27 Thread Deenar Toraskar
Hi

Has anyone tried running Spark 2.0 on HDP. I have managed to get around the
issues with the timeline service (by turning it off), but now am stuck when
the YARN cannot find org.apache.spark.deploy.yarn.ExecutorLauncher.

Error: Could not find or load main class
org.apache.spark.deploy.yarn.ExecutorLauncher

I have verified that both spark.driver.extraJavaOptions and
spark.yarn.am.extraJavaOptions have the hdp.version set correctly. Anything
else I am missing?

Regards
Deenar



On 10 May 2016 at 13:48, Steve Loughran  wrote:

>
> On 9 May 2016, at 21:24, Jesse F Chen  wrote:
>
> I had been running fine until builds around 05/07/2016
>
> If I used the "--master yarn" in builds after 05/07, I got the following
> error...sounds like something jars are missing.
>
> I am using YARN 2.7.2 and Hive 1.2.1.
>
> Do I need something new to deploy related to YARN?
>
> bin/spark-sql -driver-memory 10g --verbose* --master yarn* --packages
> com.databricks:spark-csv_2.10:1.3.0 --executor-memory 4g --num-executors
> 20 --executor-cores 2
>
> 16/05/09 13:15:21 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 16/05/09 13:15:21 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4041
> 16/05/09 13:15:21 INFO util.Utils: Successfully started service 'SparkUI'
> on port 4041.
> 16/05/09 13:15:21 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started
> at http://bigaperf116.svl.ibm.com:4041
> *Exception in thread "main" java.lang.NoClassDefFoundError:
> com/sun/jersey/api/client/config/ClientConfig*
> *at
> org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:45)*
> at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.
> serviceInit(YarnClientImpl.java:163)
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:163)
> at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:150)
> at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(
> YarnClientSchedulerBackend.scala:56)
> at org.apache.spark.scheduler.TaskSchedulerImpl.start(
> TaskSchedulerImpl.scala:148)
>
>
>
> Looks like Jersey client isn't on the classpath.
>
> 1. Consider filing a JIRA
> 2. set  spark.hadoop.yarn.timeline-service.enabled false to turn off ATS
>
> at org.apache.spark.SparkContext.(SparkContext.scala:502)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2246)
> at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.
> scala:762)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.
> init(SparkSQLEnv.scala:57)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.(
> SparkSQLCLIDriver.scala:281)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(
> SparkSQLCLIDriver.scala:138)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(
> SparkSQLCLIDriver.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:727)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:122)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException: com.sun.jersey.api.client.
> config.ClientConfig
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 22 more
> 16/05/09 13:15:21 INFO storage.DiskBlockManager: Shutdown hook called
> 16/05/09 13:15:21 INFO util.ShutdownHookManager: Shutdown hook called
> 16/05/09 13:15:21 INFO util.ShutdownHookManager: Deleting directory
> /tmp/spark-ac33b501-b9c3-47a3-93c8-fa02720bf4bb
> 16/05/09 13:15:21 INFO util.ShutdownHookManager: Deleting directory
> /tmp/spark-65cb43d9-c122-4106-a0a8-ae7d92d9e19c
> 16/05/09 13:15:21 INFO util.ShutdownHookManager: Deleting directory
> /tmp/spark-65cb43d9-c122-4106-a0a8-ae7d92d9e19c/userFiles-
> 46dde536-29e5-46b3-a530-e5ad6640f8b2
>
>
>
>
>
> <07983638.gif> *JESSE CHEN*
> Big Data Performance | IBM Analytics
>
> Office: 408 463 2296
> Mobile: 408 828 9068
> Email: jfc...@us.ibm.com
>
>
>


Re: Spark SQL partitioned tables - check for partition

2016-02-25 Thread Deenar Toraskar
Kevin

I meant the partitions on disk/hdfs not the inmemory RDD/Dataframe
partitions. If I am right mapPartitions or forEachPartitions would identify
and operate on the in memory partitions.

Deenar

On 25 February 2016 at 15:28, Kevin Mellott <kevin.r.mell...@gmail.com>
wrote:

> Once you have loaded information into a DataFrame, you can use the 
> *mapPartitionsi
> or forEachPartition *operations to both identify the partitions and
> operate against them.
>
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
>
>
> On Thu, Feb 25, 2016 at 9:24 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Hi
>>
>> How does one check for the presence of a partition in a Spark SQL
>> partitioned table (save using dataframe.write.partitionedBy("partCol") not
>> hive compatible tables), other than physically checking the directory on
>> HDFS or doing a count(*)  with the partition cols in the where clause ?
>>
>>
>> Regards
>> Deenar
>>
>
>


Spark SQL partitioned tables - check for partition

2016-02-25 Thread Deenar Toraskar
Hi

How does one check for the presence of a partition in a Spark SQL
partitioned table (save using dataframe.write.partitionedBy("partCol") not
hive compatible tables), other than physically checking the directory on
HDFS or doing a count(*)  with the partition cols in the where clause ?


Regards
Deenar


Re: How to control the number of files for dynamic partition in Spark SQL?

2016-01-30 Thread Deenar Toraskar
The following should work as long as your tables are created using Spark SQL

event_wk.repartition(2).write.partitionBy("eventDate").format("parquet"
).insertInto("event)

If you want to stick to using "insert overwrite" for Hive compatibility,
then you can repartition twice, instead of setting the global
spark.sql.shuffle.partition parameter

df eventwk = sqlContext.sql("some joins") // this should use the global
shuffle partition parameter
df eventwkRepartitioned = eventwk.repartition(2)
eventwkRepartitioned.registerTempTable("event_wk_repartitioned")
and use this in your insert statement.

registering temp table is cheap

HTH


On 29 January 2016 at 20:26, Benyi Wang  wrote:

> I want to insert into a partition table using dynamic partition, but I
> don’t want to have 200 files for a partition because the files will be
> small for my case.
>
> sqlContext.sql(  """
> |insert overwrite table event
> |partition(eventDate)
> |select
> | user,
> | detail,
> | eventDate
> |from event_wk
>   """.stripMargin)
>
> the table “event_wk” is created from a dataframe by registerTempTable,
> which is built with some joins. If I set spark.sql.shuffle.partition=2, the
> join’s performance will be bad because that property seems global.
>
> I can do something like this:
>
> event_wk.reparitition(2).write.partitionBy("eventDate").format("parquet").save(path)
>
> but I have to handle adding partitions by myself.
>
> Is there a way you can control the number of files just for this last
> insert step?
> ​
>


Re: Spark 2.0.0 release plan

2016-01-29 Thread Deenar Toraskar
A related question. Are the plans to move the default Spark builds to Scala
2.11 with Spark 2.0?

Regards
Deenar

On 27 January 2016 at 19:55, Michael Armbrust 
wrote:

> We do maintenance releases on demand when there is enough to justify doing
> one.  I'm hoping to cut 1.6.1 soon, but have not had time yet.
>
> On Wed, Jan 27, 2016 at 8:12 AM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> Will there continue to be monthly releases on the 1.6.x branch during the
>> additional time for bug fixes and such?
>>
>> On Tue, Jan 26, 2016 at 11:28 PM, Koert Kuipers 
>> wrote:
>>
>>> thanks thats all i needed
>>>
>>> On Tue, Jan 26, 2016 at 6:19 PM, Sean Owen  wrote:
>>>
 I think it will come significantly later -- or else we'd be at code
 freeze for 2.x in a few days. I haven't heard anyone discuss this
 officially but had batted around May or so instead informally in
 conversation. Does anyone have a particularly strong opinion on that?
 That's basically an extra 3 month period.

 https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage

 On Tue, Jan 26, 2016 at 10:00 PM, Koert Kuipers 
 wrote:
 > Is the idea that spark 2.0 comes out roughly 3 months after 1.6? So
 > quarterly release as usual?
 > Thanks

>>>
>>>
>>
>


spark-xml data source (com.databricks.spark.xml) not working with spark 1.6

2016-01-28 Thread Deenar Toraskar
Hi

Anyone tried using spark-xml with spark 1.6. I cannot even get the sample
book.xml file (wget
https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
) working
https://github.com/databricks/spark-xml

scala> val df =
sqlContext.read.format("com.databricks.spark.xml").load("books.xml")


scala> df.count

res4: Long = 0


Anyone else facing the same issue?


Deenar


Re: how to run latest version of spark in old version of spark in cloudera cluster ?

2016-01-27 Thread Deenar Toraskar
Sri

Look at the instructions here. They are for 1.5.1, but should also work for
1.6

https://www.linkedin.com/pulse/running-spark-151-cdh-deenar-toraskar-cfa?trk=hp-feed-article-title-publish=true=true

Deenar


On 27 January 2016 at 20:16, Koert Kuipers <ko...@tresata.com> wrote:

> you need to build spark 1.6 for your hadoop distro, and put that on the
> proxy node and configure it correctly to find your cluster (hdfs and yarn).
> then use the spark-submit script for that spark 1.6 version to launch your
> application on yarn
>
> On Wed, Jan 27, 2016 at 3:11 PM, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi Koert,
>>
>> I am submitting my code (spark jar ) using spark-submit in proxy node , I
>> checked the version of the cluster and node its says 1.2 I dint really
>> understand what you mean.
>>
>> can I ask yarn to use different version of spark ? or should I say
>> override the spark_home variables to look at 1.6 spark jar ?
>>
>> Thanks
>> Sri
>>
>> On Wed, Jan 27, 2016 at 7:45 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> If you have yarn you can just launch your spark 1.6 job from a single
>>> machine with spark 1.6 available on it and ignore the version of spark
>>> (1.2) that is installed
>>> On Jan 27, 2016 11:29, "kali.tumm...@gmail.com" <kali.tumm...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Just realized cloudera version of spark on my cluster is 1.2, the jar
>>>> which
>>>> I built using maven is version 1.6 which is causing issue.
>>>>
>>>> Is there a way to run spark version 1.6 in 1.2 version of spark ?
>>>>
>>>> Thanks
>>>> Sri
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-latest-version-of-spark-in-old-version-of-spark-in-cloudera-cluster-tp26087.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>


Re: hivethriftserver2 problems on upgrade to 1.6.0

2016-01-27 Thread Deenar Toraskar
James

The problem you are facing is due to a feature introduced in Spark 1.6 -
multi-session mode, if you want to see temporary tables across session,
*set spark.sql.hive.thriftServer.singleSession=true*


   - From Spark 1.6, by default the Thrift server runs in multi-session
   mode. Which means each JDBC/ODBC connection owns a copy of their own SQL
   configuration and temporary function registry. Cached tables are still
   shared though. If you prefer to run the Thrift server in the old
   single-session mode, please set option
   spark.sql.hive.thriftServer.singleSession to true. You may either add
   this option to spark-defaults.conf, or pass it to start-thriftserver.sh
via --conf:

./sbin/start-thriftserver.sh \
 --conf spark.sql.hive.thriftServer.singleSession=true \
 ...


On 25 January 2016 at 15:06, james.gre...@baesystems.com <
james.gre...@baesystems.com> wrote:

> On upgrade from 1.5.0 to 1.6.0 I have a problem with the
> hivethriftserver2, I have this code:
>
>
>
> *val *hiveContext = *new *HiveContext(SparkContext.*getOrCreate*(conf));
>
> *val *thing = 
> hiveContext.read.parquet(*"hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208
> "*)
>
> thing.registerTempTable(*"thing"*)
>
>
>
> HiveThriftServer2.*startWithContext*(hiveContext)
>
>
>
>
>
> When I start things up on the cluster my hive-site.xml is found – I can
> see that the metastore connects:
>
>
>
>
>
> INFO  metastore - Trying to connect to metastore with URI thrift://
> dkclusterm2.imp.net:9083
>
> INFO  metastore - Connected to metastore.
>
>
>
>
>
> But then later on the thrift server seems not to connect to the remote
> hive metastore but to start a derby instance instead:
>
>
>
> INFO  AbstractService - Service:CLIService is started.
>
> INFO  ObjectStore - ObjectStore, initialize called
>
> INFO  Query - Reading in results for query
> "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used
> is closing
>
> INFO  MetaStoreDirectSql - Using direct SQL, underlying DB is DERBY
>
> INFO  ObjectStore - Initialized ObjectStore
>
> INFO  HiveMetaStore - 0: get_databases: default
>
> INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=get_databases:
> default
>
> INFO  HiveMetaStore - 0: Shutting down the object store...
>
> INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Shutting down
> the object store...
>
> INFO  HiveMetaStore - 0: Metastore shutdown complete.
>
> INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Metastore
> shutdown complete.
>
> INFO  AbstractService - Service:ThriftBinaryCLIService is started.
>
> INFO  AbstractService - Service:HiveServer2 is started.
>
>
>
>
>
> So if I connect to this with JDBC I can see all the tables on the hive
> server – but not anything temporary – I guess they are going to derby.
>
>
>
> I see someone on the databricks website is also having this problem.
>
>
>
>
>
> Thanks
>
>
>
> James
>
>
>
>
>
>
>
>
>
>
>
>
>
> *From:* patcharee [mailto:patcharee.thong...@uni.no]
> *Sent:* 25 January 2016 14:31
> *To:* user@spark.apache.org
> *Cc:* Eirik Thorsnes
> *Subject:* streaming textFileStream problem - got only ONE line
>
>
>
> Hi,
>
> My streaming application is receiving data from file system and just
> prints the input count every 1 sec interval, as the code below:
>
> * val *sparkConf = *new *SparkConf()
> * val *ssc = *new *StreamingContext(sparkConf, *Milliseconds*
> (interval_ms))
> * val *lines = ssc.textFileStream(args(0))
> lines.count().print()
>
> The problem is sometimes the data received from scc.textFileStream is ONLY
> ONE line. But in fact there are multiple lines in the new file found in
> that interval. See log below which shows three intervals. In the 2nd
> interval, the new file is:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt. This file
> contains 6288 lines. The ssc.textFileStream returns ONLY ONE line (the
> header).
>
> Any ideas/suggestions what the problem is?
>
>
> -
> SPARK LOG
>
> -
>
> 16/01/25 15:11:11 INFO FileInputDStream: Cleared 1 old files that were
> older than 1453731011000 ms: 145373101 ms
> 16/01/25 15:11:11 INFO FileInputDStream: Cleared 0 old files that were
> older than 1453731011000 ms:
> 16/01/25 15:11:12 INFO FileInputDStream: Finding new files took 4 ms
> 16/01/25 15:11:12 INFO FileInputDStream: New files at time 1453731072000
> ms:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19616.txt
> ---
> Time: 1453731072000 ms
> ---
> 6288
>
> 16/01/25 15:11:12 INFO FileInputDStream: Cleared 1 old files that were
> older than 1453731012000 ms: 1453731011000 ms
> 16/01/25 15:11:12 INFO FileInputDStream: Cleared 0 old files that were
> older than 1453731012000 ms:
> 

Sharing HiveContext in Spark JobServer / getOrCreate

2016-01-25 Thread Deenar Toraskar
Hi

I am using a shared sparkContext for all of my Spark jobs. Some of the jobs
use HiveContext, but there isn't a getOrCreate method on HiveContext which
will allow reuse of an existing HiveContext. Such a method exists on
SQLContext only (def getOrCreate(sparkContext: SparkContext): SQLContext).

Is there any reason that a HiveContext cannot be shared amongst multiple
threads within the same Spark driver process?

In addition I cannot seem to be able to cast a HiveContext to a SQLContext,
but this works fine in the spark shell, I am doing something wrong here?

scala> sqlContext

res19: org.apache.spark.sql.SQLContext =
org.apache.spark.sql.hive.HiveContext@383b3357

scala> import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SQLContext

scala> SQLContext.getOrCreate(sc)

res18: org.apache.spark.sql.SQLContext =
org.apache.spark.sql.hive.HiveContext@383b3357



Regards
Deenar


Re: Sharing HiveContext in Spark JobServer / getOrCreate

2016-01-25 Thread Deenar Toraskar
On 25 January 2016 at 21:09, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> No I hadn't. This is useful, but in some cases we do want to share the
> same temporary tables between jobs so really wanted a getOrCreate
> equivalent on HIveContext.
>
> Deenar
>
>
>
> On 25 January 2016 at 18:10, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Have you noticed the following method of HiveContext ?
>>
>>* Returns a new HiveContext as new session, which will have separated
>> SQLConf, UDF/UDAF,
>>* temporary tables and SessionState, but sharing the same
>> CacheManager, IsolatedClientLoader
>>* and Hive client (both of execution and metadata) with existing
>> HiveContext.
>>*/
>>   override def newSession(): HiveContext = {
>>
>> Cheers
>>
>> On Mon, Jan 25, 2016 at 7:22 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I am using a shared sparkContext for all of my Spark jobs. Some of the
>>> jobs use HiveContext, but there isn't a getOrCreate method on HiveContext
>>> which will allow reuse of an existing HiveContext. Such a method exists on
>>> SQLContext only (def getOrCreate(sparkContext: SparkContext):
>>> SQLContext).
>>>
>>> Is there any reason that a HiveContext cannot be shared amongst multiple
>>> threads within the same Spark driver process?
>>>
>>> In addition I cannot seem to be able to cast a HiveContext to a
>>> SQLContext, but this works fine in the spark shell, I am doing something
>>> wrong here?
>>>
>>> scala> sqlContext
>>>
>>> res19: org.apache.spark.sql.SQLContext =
>>> org.apache.spark.sql.hive.HiveContext@383b3357
>>>
>>> scala> import org.apache.spark.sql.SQLContext
>>>
>>> import org.apache.spark.sql.SQLContext
>>>
>>> scala> SQLContext.getOrCreate(sc)
>>>
>>> res18: org.apache.spark.sql.SQLContext =
>>> org.apache.spark.sql.hive.HiveContext@383b3357
>>>
>>>
>>>
>>> Regards
>>> Deenar
>>>
>>
>>
>


Generic Dataset Aggregator

2016-01-25 Thread Deenar Toraskar
Hi All

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html

I have been converting my UDAFs to Dataset (Dataset's are cool BTW)
Aggregators. I have an ArraySum aggregator that does an element wise sum or
arrays. I have got the simple version working, but the Generic version
fails with the following error, not sure what I am doing wrong.

scala> import sqlContext.implicits._

scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N]
= new GenericArraySumAggregator(f).toColumn

:34: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing sqlContext.implicits._  Support for serializing
other types will be added in future releases.

 def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I,
N] = new GenericArraySumAggregator(f).toColumn


  ^

object ArraySumAggregator extends  Aggregator[Seq[Float], Seq[Float],
Seq[Float]] with Serializable {
  def zero: Seq[Float] = Nil
  // The initial value.
  def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) =
sumArray(currentSum, currentRow)
  def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row)
  def finish(b: Seq[Float]) = b // Return the final result.
  def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = {
(a, b) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (a, b).zipped.map { case (a, b) => a + b }
}
  }
}

class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends
Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable {
  val numeric = implicitly[Numeric[N]]
  override def zero: Seq[N] = Nil
  override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b,
a.map( x => f(x))) //numeric.plus(b, f(a))
  override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2)
  override def finish(reduction: Seq[N]): Seq[N] = reduction
  def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = {
(a, b) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a, b) }
}
  }
}

Regards

Deenar


Re: Concatenating tables

2016-01-23 Thread Deenar Toraskar
On 23 Jan 2016 9:18 p.m., "Deenar Toraskar" <
deenar.toras...@thinkreactive.co.uk> wrote:

> Df.UnionAll(df2).unionall (df3)
> On 23 Jan 2016 9:02 p.m., "Andrew Holway" <andrew.hol...@otternetworks.de>
> wrote:
>
>> Is there a data frame operation to do this?
>>
>> +-+
>> | A B C D |
>> +-+
>> | 1 2 3 4 |
>> | 5 6 7 8 |
>> +-+
>> +-+
>> | A B C D |
>> +-+
>> | 3 5 6 8 |
>> | 0 0 0 0 |
>> +-+
>> +-+
>> | A B C D |
>> +-+
>> | 8 8 8 8 |
>> | 1 1 1 1 |
>> +-+
>>
>> Concatenated together to make this.
>>
>> +-+
>> | A B C D |
>> +-+
>> | 1 2 3 4 |
>> | 5 6 7 8 |
>> | 3 5 6 8 |
>> | 0 0 0 0 |
>> | 8 8 8 8 |
>> | 1 1 1 1 |
>> +-+
>>
>> Thanks,
>>
>> Andrew
>>
>


Error connecting to temporary derby metastore used by Spark, when running multiple jobs on the same SparkContext

2016-01-13 Thread Deenar Toraskar
Hi

I am using the spark-jobserver and see the following messages when a lot of
jobs are submitted simultaneously to the same SparkContext. Any ideas as to
what might cause this.

[2016-01-13 13:12:11,753] ERROR com.jolbox.bonecp.BoneCP []
[akka://JobServer/user/context-supervisor/ingest-context] - Failed to
acquire connection to
jdbc:derby:;databaseName=/tmp/spark-0b6014b0-96b8-419e-9f9f-10fa9392d9f4/metastore;create=true.
Sleeping for 7000 ms. Attempts left: 1
java.sql.SQLException: No suitable driver found for
jdbc:derby:;databaseName=/tmp/spark-0b6014b0-96b8-419e-9f9f-10fa9392d9f4/metastore;create=true

Regards
Deenar


distributeBy using advantage of HDFS or RDD partitioning

2016-01-13 Thread Deenar Toraskar
Hi

I have data in HDFS partitioned by a logical key and would like to preserve
the partitioning when creating a dataframe for the same. Is it possible to
create a dataframe that preserves partitioning from HDFS or the underlying
RDD?

Regards
Deenar


Re: Spark SQL UDF with Struct input parameters

2016-01-13 Thread Deenar Toraskar
I have raised a JIRA to cover this
https://issues.apache.org/jira/browse/SPARK-12809

On 13 January 2016 at 16:05, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Frank
>
> Sorry got my wires crossed, I had come across another issue. Now I
> remember this issue I got around this splitting the structure into 2 arrays
> and then zipping them in the UDF. So
>
> def effectiveExpectedExposure(expectedExposures: Seq[(Seq[Float],
> Seq[Float])])=expectedExposures.map(x=> x._1 *
> x._2).sum/expectedExposures.map(x=>x._1).sum
>
>
> became
>
>
> def expectedPositiveExposureSeq(expectedExposures: Seq[Float],
> timeIntervals : Seq[Float])= timeIntervals.zip(expectedExposures).map(x=>
> (x._1 * x._2)).sum/timeIntervals.sum
>
> Deenar
>
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 13 January 2016 at 15:42, Rosner, Frank (Allianz SE) <
> frank.ros...@allianz.com> wrote:
>
>> The problem is that I cannot use a UDF that has a structtype as input
>> (which seems to be the same problem that you were facing). Which key and
>> value are you talking about? They are both Seq[Float] in your example.
>>
>>
>>
>> In my example when I try to call a udf that takes a struct type I get
>>
>>
>>
>> cannot resolve 'UDF(myColumn)' due to data type mismatch: argument 1
>> requires array<struct<_1:bigint,_2:string>> type, however, 'myColumn' is of
>> array<struct<index:bigint,value:string>> type.
>>
>>
>>
>> When I then created a case class instead of using a tuple (so not to have
>> _1 but the correct name) it compiles. But when I execute it, it cannot cast
>> it to the case class because obviously the data does not contain the case
>> class inside.
>>
>>
>>
>> How would rewriting collect as a Spark UDAF help there?
>>
>>
>>
>> Thanks for your quick response!
>>
>> Frank
>>
>>
>>
>> *From:* Deenar Toraskar [mailto:deenar.toras...@thinkreactive.co.uk]
>> *Sent:* Mittwoch, 13. Januar 2016 15:56
>> *To:* Rosner, Frank (Allianz SE)
>> *Subject:* Re: Spark SQL UDF with Struct input parameters
>>
>>
>>
>> Frank
>>
>>
>>
>> I did not find a solution, as a work around I made both the key and value
>> to be of the same data type. I am going to rewrite collect as a Spark UDAF
>> when I have some spare time. You may want to do this if this is a show
>> stopper for you.
>>
>>
>>
>> Regards
>>
>> Deenar
>>
>>
>>
>>
>> *Think Reactive Ltd*
>>
>> deenar.toras...@thinkreactive.co.uk
>>
>> 07714140812
>>
>>
>>
>>
>>
>> On 13 January 2016 at 13:50, Rosner, Frank (Allianz SE) <
>> frank.ros...@allianz.com> wrote:
>>
>> Hey!
>>
>> Did you solve the issue? I am facing the same issue and cannot find a
>> solution.
>>
>> Thanks
>> Frank
>>
>> Hi
>>
>>
>>
>> I am trying to define an UDF that can take an array of tuples as input
>>
>>
>>
>> def effectiveExpectedExposure(expectedExposures: Seq[(Seq[Float],
>>
>> Seq[Float])])=
>>
>> expectedExposures.map(x=> x._1 * x._2).sum/expectedExposures.map(x=>
>>
>> x._1).sum
>>
>>
>>
>> sqlContext.udf.register("expectedPositiveExposure",
>>
>> expectedPositiveExposure _)
>>
>>
>>
>> I get the following message when I try calling this function, where
>>
>> noOfMonths and ee are both floats
>>
>>
>>
>> val df = sqlContext.sql(s"select (collect(struct(noOfMonths, ee))) as eee
>>
>> from netExposureCpty where counterparty = 'xyz'")
>>
>> df.registerTempTable("test")
>>
>> sqlContext.sql("select effectiveExpectedExposure(eee)  from test")
>>
>>
>>
>> Error in SQL statement: AnalysisException: cannot resolve 'UDF(eee)' due
>> to
>>
>> data type mismatch: argument 1 requires array<struct<_1:float,_2:float>>
>>
>> type, however, 'eee' is of array<struct<noofmonths:float,ee:float>> type.;
>>
>> line 1 pos 33
>>
>>
>>
>> Deenar
>>
>>
>>
>>
>>
>>
>>
>
>


Re: 101 question on external metastore

2016-01-07 Thread Deenar Toraskar
I sorted this out. There were 2 different version of derby and ensuring the
metastore and spark used the same version of Derby made the problem go away.

Deenar

On 6 January 2016 at 02:55, Yana Kadiyska <yana.kadiy...@gmail.com> wrote:

> Deenar, I have not resolved this issue. Why do you think it's from
> different versions of Derby? I was playing with this as a fun experiment
> and my setup was on a clean machine -- no other versions of
> hive/hadoop/etc...
>
> On Sun, Dec 20, 2015 at 12:17 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> apparently it is down to different versions of derby in the classpath,
>> but i am unsure where the other version is coming from. The setup worked
>> perfectly with spark 1.3.1.
>>
>> Deenar
>>
>> On 20 December 2015 at 04:41, Deenar Toraskar <deenar.toras...@gmail.com>
>> wrote:
>>
>>> Hi Yana/All
>>>
>>> I am getting the same exception. Did you make any progress?
>>>
>>> Deenar
>>>
>>> On 5 November 2015 at 17:32, Yana Kadiyska <yana.kadiy...@gmail.com>
>>> wrote:
>>>
>>>> Hi folks, trying experiment with a minimal external metastore.
>>>>
>>>> I am following the instructions here:
>>>> https://cwiki.apache.org/confluence/display/Hive/HiveDerbyServerMode
>>>>
>>>> I grabbed Derby 10.12.1.1 and started an instance, verified I can
>>>> connect via ij tool and that process is listening on 1527
>>>>
>>>> put the following hive-site.xml under conf
>>>> ```
>>>> 
>>>> 
>>>> 
>>>> 
>>>>   javax.jdo.option.ConnectionURL
>>>>   jdbc:derby://localhost:1527/metastore_db;create=true
>>>>   JDBC connect string for a JDBC metastore
>>>> 
>>>> 
>>>>   javax.jdo.option.ConnectionDriverName
>>>>   org.apache.derby.jdbc.ClientDriver
>>>>   Driver class name for a JDBC metastore
>>>> 
>>>> 
>>>> ```
>>>>
>>>> I then try to run spark-shell thusly:
>>>> bin/spark-shell --driver-class-path
>>>> /home/yana/db-derby-10.12.1.1-bin/lib/derbyclient.jar
>>>>
>>>> and I get an ugly stack trace like so...
>>>>
>>>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>>>> org.apache.derby.jdbc.EmbeddedDriver
>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>>> at
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>> at
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>> at java.lang.Class.newInstance(Class.java:379)
>>>> at
>>>> org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:47)
>>>> at
>>>> org.datanucleus.store.rdbms.connectionpool.DBCPConnectionPoolFactory.createConnectionPool(DBCPConnectionPoolFactory.java:50)
>>>> at
>>>> org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:238)
>>>> at
>>>> org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:131)
>>>> at
>>>> org.datanucleus.store.rdbms.ConnectionFactoryImpl.(ConnectionFactoryImpl.java:85)
>>>> ... 114 more
>>>>
>>>> :10: error: not found: value sqlContext
>>>>import sqlContext.implicits._
>>>>
>>>>
>>>> What am I doing wrong -- not sure why it's looking for Embedded
>>>> anything, I'm specifically trying to not use the embedded server...but I
>>>> know my hive-site is being read as starting witout --driver-class-path does
>>>> say it can't load org.apache.derby.jdbc.ClientDriver
>>>>
>>>
>>>
>>
>


Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2016-01-06 Thread Deenar Toraskar
Hi guys


   1. >> Add this jar to the classpath of all NodeManagers in your cluster.


A related question on configuration of the auxillary shuffle service. *How
do i find the classpath for NodeManager?* I tried finding all places where
the existing mapreduce shuffle jars are present and place the spark yarn
shuffle jar in the same location, but with no success.

$ find . -name *shuffle*.jar
./hadoop/client/hadoop-mapreduce-client-shuffle.jar
./hadoop/client/hadoop-mapreduce-client-shuffle-2.7.1.2.3.2.0-2950.jar
./hadoop/client/spark-1.6.0-SNAPSHOT-yarn-shuffle.jar
./hadoop-mapreduce/hadoop-mapreduce-client-shuffle.jar
./hadoop-mapreduce/hadoop-mapreduce-client-shuffle-2.7.1.2.3.2.0-2950.jar
./falcon/client/lib/hadoop-mapreduce-client-shuffle-2.7.1.2.3.2.0-2950.jar
./oozie/libserver/hadoop-mapreduce-client-shuffle-2.7.1.2.3.2.0-2950.jar
./oozie/libtools/hadoop-mapreduce-client-shuffle-2.7.1.2.3.2.0-2950.jar
./spark/lib/spark-1.4.1.2.3.2.0-2950-yarn-shuffle.jar
Regards
Deenar

On 7 October 2015 at 01:27, Alex Rovner  wrote:

> Thank you all for your help.
>
> *Alex Rovner*
> *Director, Data Engineering *
> *o:* 646.759.0052
>
> * *
>
> On Tue, Oct 6, 2015 at 11:17 AM, Steve Loughran 
> wrote:
>
>>
>> On 6 Oct 2015, at 01:23, Andrew Or  wrote:
>>
>> Both the history server and the shuffle service are backward compatible,
>> but not forward compatible. This means as long as you have the latest
>> version of history server / shuffle service running in your cluster then
>> you're fine (you don't need multiple of them).
>>
>>
>> FWIW I've just created a JIRA on tracking/reporting version mismatch on
>> history server playback better:
>> https://issues.apache.org/jira/browse/SPARK-10950
>>
>> Even though the UI can't be expected to playback later histories, it
>> could be possible to report the issue in a way that users can act on "run a
>> later version", rather than raise support calls.
>>
>>
>


Re: Spark SQL dataframes explode /lateral view help

2016-01-05 Thread Deenar Toraskar
val sparkConf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("Dataframe Test")

val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)

val dataframe = sql.read.json("orders.json")

val expanded = dataframe
  .explode[::[Long], Long]("items", "item1")(row => row)
  .explode[::[Long], Long]("items", "item2")(row => row)

val grouped = expanded
  .where(expanded("item1") !== expanded("item2"))
  .groupBy("item1", "item2")
  .count()

val recs = grouped
  .groupBy("item1")

I found another example above, but I cant seem to figure out what this does?

val expanded = dataframe
  .explode[::[Long], Long]("items", "item1")(row => row)
  .explode[::[Long], Long]("items", "item2")(row => row)



On 5 January 2016 at 20:00, Deenar Toraskar <deenar.toras...@gmail.com>
wrote:

> Hi All
>
> I have the following spark sql query and would like to use convert this to
> use the dataframes api (spark 1.6). The eee, eep and pfep are all maps of
> (int -> float)
>
>
> select e.counterparty, epe, mpfe, eepe, noOfMonthseep, teee as
> effectiveExpectedExposure, teep as expectedExposure , tpfep as pfe
> |from exposureMeasuresCpty e
>   |  lateral view explode(eee) dummy1 as noOfMonthseee, teee
>   |  lateral view explode(eep) dummy2 as noOfMonthseep, teep
>   |  lateral view explode(pfep) dummy3 as noOfMonthspfep, tpfep
>   |where e.counterparty = '$cpty' and noOfMonthseee = noOfMonthseep and
> noOfMonthseee = noOfMonthspfep
>   |order by noOfMonthseep""".stripMargin
>
> Any guidance or samples would be appreciated. I have seen code snippets
> that handle arrays, but havent come across how to handle maps
>
> case class Book(title: String, words: String)
>val df: RDD[Book]
>
>case class Word(word: String)
>val allWords = df.explode('words) {
>  case Row(words: String) => words.split(" ").map(Word(_))
>}
>
>val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))
>
>
> Regards
> Deenar
>


Spark SQL dataframes explode /lateral view help

2016-01-05 Thread Deenar Toraskar
Hi All

I have the following spark sql query and would like to use convert this to
use the dataframes api (spark 1.6). The eee, eep and pfep are all maps of
(int -> float)


select e.counterparty, epe, mpfe, eepe, noOfMonthseep, teee as
effectiveExpectedExposure, teep as expectedExposure , tpfep as pfe
|from exposureMeasuresCpty e
  |  lateral view explode(eee) dummy1 as noOfMonthseee, teee
  |  lateral view explode(eep) dummy2 as noOfMonthseep, teep
  |  lateral view explode(pfep) dummy3 as noOfMonthspfep, tpfep
  |where e.counterparty = '$cpty' and noOfMonthseee = noOfMonthseep and
noOfMonthseee = noOfMonthspfep
  |order by noOfMonthseep""".stripMargin

Any guidance or samples would be appreciated. I have seen code snippets
that handle arrays, but havent come across how to handle maps

case class Book(title: String, words: String)
   val df: RDD[Book]

   case class Word(word: String)
   val allWords = df.explode('words) {
 case Row(words: String) => words.split(" ").map(Word(_))
   }

   val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))


Regards
Deenar


Re: Spark SQL UDF with Struct input parameters

2015-12-25 Thread Deenar Toraskar
I have found that this even does not work with a struct as an input
parameter

def testUDF(expectedExposures: (Float, Float))= {
(expectedExposures._1 * expectedExposures._2 /expectedExposures._1)
  }
sqlContext.udf.register("testUDF", testUDF _)

sqlContext.sql("select testUDF(struct(noofmonths,ee)) from netExposureCpty")

The full stacktrace is given below

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException:
org.apache.spark.sql.AnalysisException: cannot resolve
'UDF(struct(noofmonths,ee))' due to data type mismatch: argument 1 requires
struct<_1:float,_2:float> type, however, 'struct(noofmonths,ee)' is of
struct<noofmonths:float,ee:float> type.; line 1 pos 33 at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
scala.collection.Iterator$class.foreach(Iterator.scala:727) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at
scala.collection.AbstractIterator.to(Iterator.scala:1157) at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)



On 26 December 2015 at 02:42, Deenar Toraskar <deenar.toras...@gmail.com>
wrote:

> Hi
>
> I am trying to define an UDF that can take an array of tuples as input
>
> def effectiveExpectedExposure(expectedExposures: Seq[(Seq[Float], 
> Seq[Float])])=
> expectedExposures.map(x=> x._1 * x._2).sum/expectedExposures.map(x=>
> x._1).sum
>
> sqlContext.udf.register("expectedPositiveExposure",
> expectedPositiveExposure _)
>
> I get the following message when I try calling this function, where
> noOfMonths and ee are both floats
>
> val df = sqlContext.sql(s"select (collect(struct(noOfMonths, ee))) as eee
>  from netExposureCpty where counterparty = 'xyz'")
> df.registerTempTable("test")
> sqlContext.sql("select effectiveExpectedExposure(eee)  from test")
>
> Error in SQL statement: AnalysisException: cannot resolve 'UDF(eee)' due
> to data type mismatch: argument 1 requires array<struct<_1:float,_2:float>>
> type, however, 'eee' is of array<struct<noofmonths:float,ee:float>> type.;
> line 1 pos 33
>
> Deenar
>


Spark SQL UDF with Struct input parameters

2015-12-25 Thread Deenar Toraskar
Hi

I am trying to define an UDF that can take an array of tuples as input

def effectiveExpectedExposure(expectedExposures: Seq[(Seq[Float],
Seq[Float])])=
expectedExposures.map(x=> x._1 * x._2).sum/expectedExposures.map(x=>
x._1).sum

sqlContext.udf.register("expectedPositiveExposure",
expectedPositiveExposure _)

I get the following message when I try calling this function, where
noOfMonths and ee are both floats

val df = sqlContext.sql(s"select (collect(struct(noOfMonths, ee))) as eee
 from netExposureCpty where counterparty = 'xyz'")
df.registerTempTable("test")
sqlContext.sql("select effectiveExpectedExposure(eee)  from test")

Error in SQL statement: AnalysisException: cannot resolve 'UDF(eee)' due to
data type mismatch: argument 1 requires array>
type, however, 'eee' is of array> type.;
line 1 pos 33

Deenar


Re: 101 question on external metastore

2015-12-19 Thread Deenar Toraskar
apparently it is down to different versions of derby in the classpath, but
i am unsure where the other version is coming from. The setup worked
perfectly with spark 1.3.1.

Deenar

On 20 December 2015 at 04:41, Deenar Toraskar <deenar.toras...@gmail.com>
wrote:

> Hi Yana/All
>
> I am getting the same exception. Did you make any progress?
>
> Deenar
>
> On 5 November 2015 at 17:32, Yana Kadiyska <yana.kadiy...@gmail.com>
> wrote:
>
>> Hi folks, trying experiment with a minimal external metastore.
>>
>> I am following the instructions here:
>> https://cwiki.apache.org/confluence/display/Hive/HiveDerbyServerMode
>>
>> I grabbed Derby 10.12.1.1 and started an instance, verified I can connect
>> via ij tool and that process is listening on 1527
>>
>> put the following hive-site.xml under conf
>> ```
>> 
>> 
>> 
>> 
>>   javax.jdo.option.ConnectionURL
>>   jdbc:derby://localhost:1527/metastore_db;create=true
>>   JDBC connect string for a JDBC metastore
>> 
>> 
>>   javax.jdo.option.ConnectionDriverName
>>   org.apache.derby.jdbc.ClientDriver
>>   Driver class name for a JDBC metastore
>> 
>> 
>> ```
>>
>> I then try to run spark-shell thusly:
>> bin/spark-shell --driver-class-path
>> /home/yana/db-derby-10.12.1.1-bin/lib/derbyclient.jar
>>
>> and I get an ugly stack trace like so...
>>
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.derby.jdbc.EmbeddedDriver
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>> at java.lang.Class.newInstance(Class.java:379)
>> at
>> org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:47)
>> at
>> org.datanucleus.store.rdbms.connectionpool.DBCPConnectionPoolFactory.createConnectionPool(DBCPConnectionPoolFactory.java:50)
>> at
>> org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:238)
>> at
>> org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:131)
>> at
>> org.datanucleus.store.rdbms.ConnectionFactoryImpl.(ConnectionFactoryImpl.java:85)
>> ... 114 more
>>
>> :10: error: not found: value sqlContext
>>import sqlContext.implicits._
>>
>>
>> What am I doing wrong -- not sure why it's looking for Embedded anything,
>> I'm specifically trying to not use the embedded server...but I know my
>> hive-site is being read as starting witout --driver-class-path does say it
>> can't load org.apache.derby.jdbc.ClientDriver
>>
>
>


Re: 101 question on external metastore

2015-12-19 Thread Deenar Toraskar
Hi Yana/All

I am getting the same exception. Did you make any progress?

Deenar

On 5 November 2015 at 17:32, Yana Kadiyska  wrote:

> Hi folks, trying experiment with a minimal external metastore.
>
> I am following the instructions here:
> https://cwiki.apache.org/confluence/display/Hive/HiveDerbyServerMode
>
> I grabbed Derby 10.12.1.1 and started an instance, verified I can connect
> via ij tool and that process is listening on 1527
>
> put the following hive-site.xml under conf
> ```
> 
> 
> 
> 
>   javax.jdo.option.ConnectionURL
>   jdbc:derby://localhost:1527/metastore_db;create=true
>   JDBC connect string for a JDBC metastore
> 
> 
>   javax.jdo.option.ConnectionDriverName
>   org.apache.derby.jdbc.ClientDriver
>   Driver class name for a JDBC metastore
> 
> 
> ```
>
> I then try to run spark-shell thusly:
> bin/spark-shell --driver-class-path
> /home/yana/db-derby-10.12.1.1-bin/lib/derbyclient.jar
>
> and I get an ugly stack trace like so...
>
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.derby.jdbc.EmbeddedDriver
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:379)
> at
> org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:47)
> at
> org.datanucleus.store.rdbms.connectionpool.DBCPConnectionPoolFactory.createConnectionPool(DBCPConnectionPoolFactory.java:50)
> at
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:238)
> at
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:131)
> at
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.(ConnectionFactoryImpl.java:85)
> ... 114 more
>
> :10: error: not found: value sqlContext
>import sqlContext.implicits._
>
>
> What am I doing wrong -- not sure why it's looking for Embedded anything,
> I'm specifically trying to not use the embedded server...but I know my
> hive-site is being read as starting witout --driver-class-path does say it
> can't load org.apache.derby.jdbc.ClientDriver
>


Re: [Spark 1.5]: Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space -- Work in 1.4, but 1.5 doesn't

2015-12-15 Thread Deenar Toraskar
On 16 December 2015 at 06:19, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Hi
>
> I had the same problem. There is a query with a lot of small tables (5x)
> all below the broadcast threshold and Spark is broadcasting all these
> tables together without checking if there is sufficient memory available.
>
> I got around this issue by reducing the
> *spark.sql.autoBroadcastJoinThreshold* to stop broadcasting the bigger
> tables in the query.
>
> This looks like a issue to me. A fix would be to
> a) ensure that in addition to the per table threshold, there is a total
> broadcast size per query, so only data upto that limit is broadcast
> preventing executors running out of memory.
>
> Shall I raise a JIRA for this?
>
> Regards
> Deenar
>
>
> On 4 November 2015 at 22:55, Shuai Zheng <szheng.c...@gmail.com> wrote:
>
>> And an update is: this ONLY happen in Spark 1.5, I try to run it under
>> Spark 1.4 and 1.4.1, there are no issue (the program is developed under
>> Spark 1.4 last time, and I just re-test it, it works). So this is proven
>> that there is no issue on the logic and data, it is caused by the new
>> version of Spark.
>>
>>
>>
>> So I want to know any new setup I should set in Spark 1.5 to make it
>> work?
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>>
>>
>> *From:* Shuai Zheng [mailto:szheng.c...@gmail.com]
>> *Sent:* Wednesday, November 04, 2015 3:22 PM
>> *To:* user@spark.apache.org
>> *Subject:* [Spark 1.5]: Exception in thread "broadcast-hash-join-2"
>> java.lang.OutOfMemoryError: Java heap space
>>
>>
>>
>> Hi All,
>>
>>
>>
>> I have a program which actually run a bit complex business (join) in
>> spark. And I have below exception:
>>
>>
>>
>> I running on Spark 1.5, and with parameter:
>>
>>
>>
>> spark-submit --deploy-mode client --executor-cores=24 --driver-memory=2G
>> --executor-memory=45G –class …
>>
>>
>>
>> Some other setup:
>>
>>
>>
>> sparkConf.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max",
>> "2047m");
>>
>> sparkConf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps").set("spark.sql.autoBroadcastJoinThreshold",
>> "104857600");
>>
>>
>>
>> This is running on AWS c3*8xlarge instance. I am not sure what kind of
>> parameter I should set if I have below OutOfMemoryError exception.
>>
>>
>>
>> #
>>
>> # java.lang.OutOfMemoryError: Java heap space
>>
>> # -XX:OnOutOfMemoryError="kill -9 %p"
>>
>> #   Executing /bin/sh -c "kill -9 10181"...
>>
>> Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError:
>> Java heap space
>>
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>> Source)
>>
>> at
>> org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:380)
>>
>> at
>> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:123)
>>
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:95)
>>
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:85)
>>
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100)
>>
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:85)
>>
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:85)
>>
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Any hint will be very helpful.
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>
>


Re: [Spark 1.5]: Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space -- Work in 1.4, but 1.5 doesn't

2015-12-15 Thread Deenar Toraskar
Hi

I have created an issue for this
https://issues.apache.org/jira/browse/SPARK-12358

Regards
Deenar

On 16 December 2015 at 06:21, Deenar Toraskar <deenar.toras...@gmail.com>
wrote:

>
>
> On 16 December 2015 at 06:19, Deenar Toraskar <
> deenar.toras...@thinkreactive.co.uk> wrote:
>
>> Hi
>>
>> I had the same problem. There is a query with a lot of small tables (5x)
>> all below the broadcast threshold and Spark is broadcasting all these
>> tables together without checking if there is sufficient memory available.
>>
>> I got around this issue by reducing the
>> *spark.sql.autoBroadcastJoinThreshold* to stop broadcasting the bigger
>> tables in the query.
>>
>> This looks like a issue to me. A fix would be to
>> a) ensure that in addition to the per table threshold, there is a total
>> broadcast size per query, so only data upto that limit is broadcast
>> preventing executors running out of memory.
>>
>> Shall I raise a JIRA for this?
>>
>> Regards
>> Deenar
>>
>>
>> On 4 November 2015 at 22:55, Shuai Zheng <szheng.c...@gmail.com> wrote:
>>
>>> And an update is: this ONLY happen in Spark 1.5, I try to run it under
>>> Spark 1.4 and 1.4.1, there are no issue (the program is developed under
>>> Spark 1.4 last time, and I just re-test it, it works). So this is proven
>>> that there is no issue on the logic and data, it is caused by the new
>>> version of Spark.
>>>
>>>
>>>
>>> So I want to know any new setup I should set in Spark 1.5 to make it
>>> work?
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>> Shuai
>>>
>>>
>>>
>>> *From:* Shuai Zheng [mailto:szheng.c...@gmail.com]
>>> *Sent:* Wednesday, November 04, 2015 3:22 PM
>>> *To:* user@spark.apache.org
>>> *Subject:* [Spark 1.5]: Exception in thread "broadcast-hash-join-2"
>>> java.lang.OutOfMemoryError: Java heap space
>>>
>>>
>>>
>>> Hi All,
>>>
>>>
>>>
>>> I have a program which actually run a bit complex business (join) in
>>> spark. And I have below exception:
>>>
>>>
>>>
>>> I running on Spark 1.5, and with parameter:
>>>
>>>
>>>
>>> spark-submit --deploy-mode client --executor-cores=24 --driver-memory=2G
>>> --executor-memory=45G –class …
>>>
>>>
>>>
>>> Some other setup:
>>>
>>>
>>>
>>> sparkConf.set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max",
>>> "2047m");
>>>
>>> sparkConf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails
>>> -XX:+PrintGCTimeStamps").set("spark.sql.autoBroadcastJoinThreshold",
>>> "104857600");
>>>
>>>
>>>
>>> This is running on AWS c3*8xlarge instance. I am not sure what kind of
>>> parameter I should set if I have below OutOfMemoryError exception.
>>>
>>>
>>>
>>> #
>>>
>>> # java.lang.OutOfMemoryError: Java heap space
>>>
>>> # -XX:OnOutOfMemoryError="kill -9 %p"
>>>
>>> #   Executing /bin/sh -c "kill -9 10181"...
>>>
>>> Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError:
>>> Java heap space
>>>
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>>> Source)
>>>
>>> at
>>> org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:380)
>>>
>>> at
>>> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:123)
>>>
>>> at
>>> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:95)
>>>
>>> at
>>> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:85)
>>>
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100)
>>>
>>> at
>>> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:85)
>>>
>>> at
>>> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:85)
>>>
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> Any hint will be very helpful.
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>> Shuai
>>>
>>
>>
>


Re: hive thriftserver and fair scheduling

2015-12-08 Thread Deenar Toraskar
Thanks Michael, I'll try it out. Another quick/important question: How do I
make udfs available to all of the hive thriftserver users? Right now, when
I launch a spark-sql client, I notice that it reads the ~/.hiverc file and
all udfs get picked up but this doesn't seem to be working in hive
thriftserver.
Is there a way to make it work in a similar way for all users in hive
 thriftserver?

+1 for this request


On 20 October 2015 at 23:49, Sadhan Sood  wrote:

> Thanks Michael, I'll try it out. Another quick/important question: How do
> I make udfs available to all of the hive thriftserver users? Right now,
> when I launch a spark-sql client, I notice that it reads the ~/.hiverc file
> and all udfs get picked up but this doesn't seem to be working in hive
> thriftserver. Is there a way to make it work in a similar way for all users
> in hive thriftserver?
>
> Thanks again!
>
> On Tue, Oct 20, 2015 at 12:34 PM, Michael Armbrust  > wrote:
>
>> Not the most obvious place in the docs... but this is probably helpful:
>> https://spark.apache.org/docs/latest/sql-programming-guide.html#scheduling
>>
>> You likely want to put each user in their own pool.
>>
>> On Tue, Oct 20, 2015 at 11:55 AM, Sadhan Sood 
>> wrote:
>>
>>> Hi All,
>>>
>>> Does anyone have fair scheduling working for them in a hive server? I
>>> have one hive thriftserver running and multiple users trying to run queries
>>> at the same time on that server using a beeline client. I see that a big
>>> query is stopping all other queries from making any progress. Is this
>>> supposed to be this way? Is there anything else that I need to be doing for
>>> fair scheduling to be working for the thriftserver?
>>>
>>
>>
>


Re: Can't create UDF's in spark 1.5 while running using the hive thrift service

2015-12-08 Thread Deenar Toraskar
Hi Trystan

I am facing the same issue. It only appears with the thrift server, the
same call works fine via the spark-sql shell. Do you have any workarounds
and have you filed a JIRA/bug for the same?

Regards
Deenar

On 12 October 2015 at 18:01, Trystan Leftwich  wrote:

> Hi everyone,
>
> Since upgrading to spark 1.5 I've been unable to create and use UDF's when
> we run in thrift server mode.
>
> Our setup:
> We start the thrift-server running against yarn in client mode, (we've
> also built our own spark from github branch-1.5 with the following args,
> -Pyarn -Phive -Phive-thrifeserver)
>
> if i run the following after connecting via JDBC (in this case via
> beeline):
>
> add jar 'hdfs://path/to/jar"
> (this command succeeds with no errors)
>
> CREATE TEMPORARY FUNCTION testUDF AS 'com.foo.class.UDF';
> (this command succeeds with no errors)
>
> select testUDF(col1) from table1;
>
> I get the following error in the logs:
>
> org.apache.spark.sql.AnalysisException: undefined function testUDF; line 1
> pos 8
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:58)
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:58)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:57)
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:53)
> at scala.util.Try.getOrElse(Try.scala:77)
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUDFs.scala:53)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
> at
> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:505)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:502)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:232)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:232)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
> .
> . (cutting the bulk for ease of email, more than happy to send the full
> output)
> .
> 15/10/12 14:34:37 ERROR SparkExecuteStatementOperation: Error running hive
> query:
> org.apache.hive.service.cli.HiveSQLException:
> org.apache.spark.sql.AnalysisException: undefined function testUDF; line 1
> pos 100
> at
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.runInternal(SparkExecuteStatementOperation.scala:259)
> at
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:171)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
> at
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:182)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> When I ran the same against 1.4 it worked.
>
> I've also changed the spark.sql.hive.metastore.version version to be 0.13
> (similar to what it was in 1.4) and 0.14 but I still get the same errors.
>
>
> Any suggestions?
>
> Thanks,
> Trystan
>
>


Re: Dataset and lambas

2015-12-07 Thread Deenar Toraskar
Michael

Having VectorUnionSumUDAF implemented would be great. This is quite
generic, it does element-wise sum of arrays and maps
https://github.com/klout/brickhouse/blob/master/src/main/java/brickhouse/udf/timeseries/VectorUnionSumUDAF.java
and would be massive benefit for a lot of risk analytics.

In general most of the brickhouse UDFs are quite useful
https://github.com/klout/brickhouse. Happy to help out.

On another note what would be involved to have arrays backed by a sparse
Array (I am assuming the current implementation is dense), sort of native
support for http://spark.apache.org/docs/latest/mllib-data-types.html

Regards
Deenar



Regards
Deenar

On 7 December 2015 at 20:21, Michael Armbrust <mich...@databricks.com>
wrote:

> On Sat, Dec 5, 2015 at 3:27 PM, Deenar Toraskar <deenar.toras...@gmail.com
> > wrote:
>>
>> On a similar note, what is involved in getting native support for some
>> user defined functions, so that they are as efficient as native Spark SQL
>> expressions? I had one particular one - an arraySum (element wise sum) that
>> is heavily used in a lot of risk analytics.
>>
>
> To get the best performance you have to implement a catalyst expression
> with codegen.  This however is necessarily an internal (unstable) interface
> since we are constantly making breaking changes to improve performance.  So
> if its a common enough operation we should bake it into the engine.
>
> That said, the code generated encoders that we created for datasets should
> lower the cost of calling into external functions as we start using them in
> more and more places (i.e.
> https://issues.apache.org/jira/browse/SPARK-11593)
>


Re: SparkSQL AVRO

2015-12-07 Thread Deenar Toraskar
By default Spark will create one file per partition. Spark SQL defaults to
using 200 partitions. If you want to reduce the number of files written
out, repartition your dataframe using repartition and give it the desired
number of partitions.

originalDF.repartition(10).write.avro("masterNew.avro")

Deenar



On 7 December 2015 at 21:21, Ruslan Dautkhanov  wrote:

> How man
>
On 7 December 2015 at 21:21, Ruslan Dautkhanov  wrote:

> How many reducers you had that created those avro files?
> Each reducer very likely creates its own avro part- file.
>
> We normally use Parquet, but it should be the same for Avro, so this might
> be
> relevant
>
> http://stackoverflow.com/questions/34026764/how-to-limit-parquet-file-dimension-for-a-parquet-table-in-hive/34059289#34059289
>
>
>
>
> --
> Ruslan Dautkhanov
>
> On Mon, Dec 7, 2015 at 11:27 AM, Test One  wrote:
>
>> I'm using spark-avro with SparkSQL to process and output avro files. My
>> data has the following schema:
>>
>> root
>>  |-- memberUuid: string (nullable = true)
>>  |-- communityUuid: string (nullable = true)
>>  |-- email: string (nullable = true)
>>  |-- firstName: string (nullable = true)
>>  |-- lastName: string (nullable = true)
>>  |-- username: string (nullable = true)
>>  |-- profiles: map (nullable = true)
>>  ||-- key: string
>>  ||-- value: string (valueContainsNull = true)
>>
>>
>> When I write the file output as such with:
>> originalDF.write.avro("masterNew.avro")
>>
>> The output location is a folder with masterNew.avro and with many many
>> files like these:
>> -rw-r--r--   1 kcsham  access_bpf 8 Dec  2 11:37 ._SUCCESS.crc
>> -rw-r--r--   1 kcsham  access_bpf44 Dec  2 11:37
>> .part-r-0-0c834f3e-9c15-4470-ad35-02f061826263.avro.crc
>> -rw-r--r--   1 kcsham  access_bpf44 Dec  2 11:37
>> .part-r-1-0c834f3e-9c15-4470-ad35-02f061826263.avro.crc
>> -rw-r--r--   1 kcsham  access_bpf44 Dec  2 11:37
>> .part-r-2-0c834f3e-9c15-4470-ad35-02f061826263.avro.crc
>> -rw-r--r--   1 kcsham  access_bpf 0 Dec  2 11:37 _SUCCESS
>> -rw-r--r--   1 kcsham  access_bpf  4261 Dec  2 11:37
>> part-r-0-0c834f3e-9c15-4470-ad35-02f061826263.avro
>> -rw-r--r--   1 kcsham  access_bpf  4261 Dec  2 11:37
>> part-r-1-0c834f3e-9c15-4470-ad35-02f061826263.avro
>> -rw-r--r--   1 kcsham  access_bpf  4261 Dec  2 11:37
>> part-r-2-0c834f3e-9c15-4470-ad35-02f061826263.avro
>>
>>
>> Where there are ~10 record, it has ~28000 files in that folder. When
>> I simply want to copy the same dataset to a new location as an exercise
>> from a local master, it takes long long time and having errors like such as
>> well.
>>
>> 22:01:44.247 [Executor task launch worker-21] WARN
>>  org.apache.spark.storage.MemoryStore - Not enough space to cache
>> rdd_112058_10705 in memory! (computed 496.0 B so far)
>> 22:01:44.247 [Executor task launch worker-21] WARN
>>  org.apache.spark.CacheManager - Persisting partition rdd_112058_10705 to
>> disk instead.
>> [Stage 0:===>   (10706 + 1) /
>> 28014]22:01:44.574 [Executor task launch worker-21] WARN
>>  org.apache.spark.storage.MemoryStore - Failed to reserve initial memory
>> threshold of 1024.0 KB for computing block rdd_112058_10706 in memory.
>>
>>
>> I'm attributing that there are way too many files to manipulate. The
>> questions:
>>
>> 1. Is this the expected format of the avro file written by spark-avro?
>> and each 'part-' is not more than 4k?
>> 2. My use case is to append new records to the existing dataset using:
>> originalDF.unionAll(stageDF).write.avro(masterNew)
>> Any sqlconf, sparkconf that I should set to allow this to work?
>>
>>
>> Thanks,
>> kc
>>
>>
>>
>


Spark SQL - saving to multiple partitions in parallel - FileNotFoundException on _temporary directory possible bug?

2015-12-07 Thread Deenar Toraskar
Hi

I have a process that writes to multiple partitions of the same table in
parallel using multiple threads sharing the same SQL context
df.write.partitionedBy("partCol").insertInto("tableName") . I am
getting FileNotFoundException on _temporary directory. Each write only goes
to a single partition, is there some way of not using dynamic partitioning
using df.write without having to resort to .save as I dont want to hardcode
a physical DFS location in my code?

This is similar to this issue listed here
https://issues.apache.org/jira/browse/SPARK-2984

Regards
Deenar

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk


Re: Dataset and lambas

2015-12-05 Thread Deenar Toraskar
Hi Michael

On a similar note, what is involved in getting native support for some user
defined functions, so that they are as efficient as native Spark SQL
expressions? I had one particular one - an arraySum (element wise sum) that
is heavily used in a lot of risk analytics.


Deenar

On 5 December 2015 at 21:09, Michael Armbrust 
wrote:

> On Sat, Dec 5, 2015 at 9:42 AM, Koert Kuipers  wrote:
>
>> hello all,
>> DataFrame internally uses a different encoding for values then what the
>> user sees. i assume the same is true for Dataset?
>>
>
> This is true.  We encode objects in the tungsten binary format using code
> generated serializers.
>
>
>> if so, does this means that a function like Dataset.map needs to convert
>> all the values twice (once to user format and then back to internal
>> format)? or is it perhaps possible to write scala functions that operate on
>> internal formats and avoid this?
>>
>
> Currently this is true, but there are plans to avoid unnecessary
> conversions (back to back maps / filters, etc) and only convert when we
> need to (shuffles, sorting, hashing, SQL operations).
>
> There are also plans to allow you to directly access some of the more
> efficient internal types by using them as fields in your classes (mutable
> UTF8 String instead of the immutable java.lang.String).
>
>


Spark Streaming BackPressure and Custom Receivers

2015-12-03 Thread Deenar Toraskar
Hi

I was going through the Spark Streaming BackPressure feature documentation and
wanted to understand how I can ensure my custom receiver is able to handle
rate limiting. I have a custom receiver similar to the TwitterInputDStream,
but there is no obvious way to throttle what is being read from the source,
in response to rate limit events.

class MyReceiver(storageLevel: StorageLevel) extends
NetworkReceiver[String](storageLevel) {def onStart() {
// Setup stuff (start threads, open sockets, etc.) to start receiving data.
// Must start new thread to receive data, as onStart() must be non-blocking.

// Call store(...) in those threads to store received data into
Spark's memory.

// Call stop(...), restart(...) or reportError(...) on any thread
based on how
// different errors needs to be handled.

// See corresponding method documentation for more details
}
def onStop() {
// Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
}
}


http://spark.apache.org/docs/latest/streaming-custom-receivers.html

The back pressure design documentation states the following. I am unable to
figure out how this works for the TwitterInputDStream either. My receiver
is similar to the TwitterInputDStream one.

   -

   TwitterInputDStream
   - no changes required. The receiver-based mechanism will handle rate
  limiting


References https://issues.apache.org/jira/browse/SPARK-7398 and
https://docs.google.com/document/d/1ls_g5fFmfbbSTIfQQpUxH56d0f3OksF567zwA00zK9E/edit#

Regards
Deenar


Re: Spark-SQL idiomatic way of adding a new partition or writing to Partitioned Persistent Table

2015-11-22 Thread Deenar Toraskar
Thanks Michael

Thanks for the response. Here is my understanding, correct me if I am wrong

1) Spark SQL written partitioned tables do not write metadata to the Hive
metastore. Spark SQL discovers partitions from the table location on the
underlying DFS, and not the metastore. It does this the first time a table
is accessed, so if the underlying partitions change a refresh table
 is required. Is there a way to see partitions discovered by
Spark SQL, show partitions  does not work on Spark SQL
partitioned tables. Also hive allows different partitions in different
physical locations, I guess this wont be possibly in Spark SQL.

2) If you want to retain compatibility with other SQL on Hadoop engines,
register your dataframe as a temp table and then use the  Hive's dynamic
partitioned insert syntax. SparkSQL uses this for Hive style tables.

3) Automatic schema discovery. I presume this is parquet only and only
if spark.sql.parquet.mergeSchema
/ mergeSchema is set to true. What happens when mergeSchema is set to false
( i guess i can check this out).

My two cents

a) it would help if there was kind of the hive nonstrict mode equivalent,
which would enforce schema compatibility for all partitions written to a
table.
b) refresh table is annoying for tables where partitions are being written
frequently, for other reasons, not sure if there is way around this.
c) it would be great if DataFrameWriter had an option to maintain
compatibility with the HiveMetastore. registerTempTable and "insert
overwrite table select from" is quite ugly and cumbersome
d) It would be helpful to resurrect the
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/,
happy to help out with the Spark SQL portions.

Regards
Deenar


On 22 November 2015 at 18:54, Michael Armbrust 
wrote:

> Is it possible to add a new partition to a persistent table using Spark
>> SQL ? The following call works and data gets written in the correct
>> directories, but no partition metadata is not added to the Hive metastore.
>>
> I believe if you use Hive's dynamic partitioned insert syntax then we will
> fall back on metastore and do the update.
>
>> In addition I see nothing preventing any arbitrary schema being appended
>> to the existing table.
>>
> This is perhaps kind of a feature, we do automatic schema discovery and
> merging when loading a new parquet table.
>
>> Does SparkSQL not need partition metadata when reading data back?
>>
> No, we dynamically discover it in a distributed job when the table is
> loaded.
>


Spark-SQL idiomatic way of adding a new partition or writing to Partitioned Persistent Table

2015-11-21 Thread Deenar Toraskar
Hi guys

Is it possible to add a new partition to a persistent table using Spark SQL
? The following call works and data gets written in the correct
directories, but no partition metadata is not added to the Hive metastore.
In addition I see nothing preventing any arbitrary schema being appended to
the existing table.

eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
windows_event_time_bin").saveAsTable("windows_event")

sqlContext.sql("show partitions windows_event")

Does SparkSQL not need partition metadata when reading data back?

Regards
Deenar


Unable to load native-hadoop library for your platform - already loaded in another classloader

2015-11-18 Thread Deenar Toraskar
Hi

I want to make sure we use short-circuit local reads for performance. I
have set the LD_LIBRARY_PATH correctly, confirmed that the native libraries
match our platform (i.e. are 64 bit and are loaded successfully). When I
start spark, i get the following message after increasing the logging level
for the relevant classes.

*15/11/18 17:47:23 DEBUG NativeCodeLoader: Failed to load native-hadoop
with error: java.lang.UnsatisfiedLinkError: Native Library
/usr/hdp/2.3.2.0-2950/hadoop/lib/native/libhadoop.so.1.0.0 already loaded
in another classloader*

Any idea what might be causing it and how to resolve this.

Regards
Deenar

[spark@edgenode1 spark-1.5.2-bin-hadoop2.6]$ bin/spark-shell
15/11/18 17:46:56 DEBUG NativeCodeLoader: Trying to load the custom-built
native-hadoop library...
15/11/18 17:46:56 DEBUG NativeCodeLoader: Loaded the native-hadoop library
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.2
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
15/11/18 17:47:00 WARN Utils: Service 'SparkUI' could not bind on port
4040. Attempting port 4041.
15/11/18 17:47:00 WARN Utils: Service 'SparkUI' could not bind on port
4041. Attempting port 4042.
15/11/18 17:47:00 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.
spark.yarn.driver.memoryOverhead is set but does not apply in client mode.
Spark context available as sc.
15/11/18 17:47:23 DEBUG NativeCodeLoader: Trying to load the custom-built
native-hadoop library...
*15/11/18 17:47:23 DEBUG NativeCodeLoader: Failed to load native-hadoop
with error: java.lang.UnsatisfiedLinkError: Native Library
/usr/hdp/2.3.2.0-2950/hadoop/lib/native/libhadoop.so.1.0.0 already loaded
in another classloader*
15/11/18 17:47:23 DEBUG NativeCodeLoader:
java.library.path=/usr/hdp/2.3.2.0-2950/hadoop/lib/native/:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
15/11/18 17:47:23 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/11/18 17:47:24 WARN DomainSocketFactory: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
SQL context available as sqlContext.


*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812


Avro RDD to DataFrame

2015-11-16 Thread Deenar Toraskar
Hi

The spark-avro module supports creation of a DataFrame from avro files. How
can convert a RDD of Avro objects that i get via SparkStreaming into a
DataFrame?

  val avroStream = KafkaUtils.createDirectStream[AvroKey[GenericRecord],
NullWritable, AvroKeyInputFormat[GenericRecord]](..)

https://github.com/databricks/spark-avro

// Creates a DataFrame from a specified fileDataFrame df =
sqlContext.read().format("com.databricks.spark.avro")
.load("src/test/resources/episodes.avro");



Regards
Deenar


spark sql "create temporary function" scala functions

2015-11-15 Thread Deenar Toraskar
Hi

I wanted to know how to go about registering scala functions as UDFs using
spark sql

create temporary function statement.

Currently I do the following

/* convert prices to holding period returns */
object VaR extends Serializable {

def returns(prices :Seq[Double], horizon: Integer) : Seq[Double] = {
  (prices zip prices.drop(horizon)).map(x=>(x._2-x._1)/x._2)
}
}
sqlContext.udf.register("returns", returns _)

in my scala code

Regards
Deenar



*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812


Re: Re: Spark RDD cache persistence

2015-11-05 Thread Deenar Toraskar
You can have a long running Spark context in several fashions. This will
ensure your data will be cached in memory. Clients will access the RDD
through a REST API that you can expose. See the Spark Job Server, it does
something similar. It has something called Named RDDs

Using Named RDDs

Named RDDs are a way to easily share RDDs among job. Using this facility,
computed RDDs can be cached with a given name and later on retrieved. To
use this feature, the SparkJob needs to mixinNamedRddSupport:

Alternatively if you use the Spark Thrift Server, any cached
dataframes/RDDs will be available to all clients of Spark via the Thrift
Server until it is shutdown.

If you want to support key value lookups you might want to use IndexedRDD


Finally not the same as sharing RDDs, Tachyon can cache underlying HDFS
blocks.

Deenar

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812



On 6 November 2015 at 05:56, r7raul1...@163.com  wrote:

> You can try
> http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html#Archival_Storage_SSD__Memory
>  .
>   Hive tmp table use this function to speed


On 6 November 2015 at 05:56, r7raul1...@163.com  wrote:

> You can try
> http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html#Archival_Storage_SSD__Memory
>  .
>   Hive tmp table use this function to speed job.
> https://issues.apache.org/jira/browse/HIVE-7313
>
> --
> r7raul1...@163.com
>
>
> *From:* Christian 
> *Date:* 2015-11-06 13:50
> *To:* Deepak Sharma 
> *CC:* user 
> *Subject:* Re: Spark RDD cache persistence
> I've never had this need and I've never done it. There are options that
> allow this. For example, I know there are web apps out there that work like
> the spark REPL. One of these I think is called Zepplin. . I've never used
> them, but I've seen them demoed. There is also Tachyon that Spark
> supports.. Hopefully, that gives you a place to start.
> On Thu, Nov 5, 2015 at 9:21 PM Deepak Sharma 
> wrote:
>
>> Thanks Christian.
>> So is there any inbuilt mechanism in spark or api integration  to other
>> inmemory cache products such as redis to load the RDD to these system upon
>> program exit ?
>> What's the best approach to have long lived RDD cache ?
>> Thanks
>>
>>
>> Deepak
>> On 6 Nov 2015 8:34 am, "Christian"  wrote:
>>
>>> The cache gets cleared out when the job finishes. I am not aware of a
>>> way to keep the cache around between jobs. You could save it as an object
>>> file to disk and load it as an object file on your next job for speed.
>>> On Thu, Nov 5, 2015 at 6:17 PM Deepak Sharma 
>>> wrote:
>>>
 Hi All
 I am confused on RDD persistence in cache .
 If I cache RDD , is it going to stay there in memory even if my spark
 program completes execution , which created it.
 If not , how can I guarantee that RDD is persisted in cache even after
 the program finishes execution.

 Thanks


 Deepak

>>>


Re: How to lookup by a key in an RDD

2015-11-02 Thread Deenar Toraskar
Swetha

Currently IndexedRDD is an external library and not part of Spark Core. You
can use it by adding a dependency and pull it in. There are plans to move
it to Spark core tracked in https://issues.apache.org/jira/browse/SPARK-2365.
See
https://spark-summit.org/2015/events/indexedrdd-efficient-fine-grained-updates-for-rdds/
and https://github.com/amplab/spark-indexedrdd

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812



On 2 November 2015 at 23:29, Ted Yu  wrote:

> Please take a look at SPARK-2365
>
> On Mon, Nov 2, 2015 at 3:25 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> Is Indexed RDDs released yet?
>>
>> Thanks,
>> Swetha
>>
>> On Sun, Nov 1, 2015 at 1:21 AM, Gylfi  wrote:
>>
>>> Hi.
>>>
>>> You may want to look into Indexed RDDs
>>> https://github.com/amplab/spark-indexedrdd
>>>
>>> Regards,
>>> Gylfi.
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-lookup-by-a-key-in-an-RDD-tp25243p25247.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: execute native system commands in Spark

2015-11-02 Thread Deenar Toraskar
You can do the following, make sure you the no of executors requested equal
the number of executors on your cluster.

import scala.sys.process._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil
sc.parallelize(0 to 10).map { _ =>(("hostname".!!).trim,
UserGroupInformation.getCurrentUser.toString)}.collect.distinct

Regards
Deenar
*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812




On 2 November 2015 at 15:38, Adrian Tanase  wrote:

> Have you seen .pipe()?
>
>
>
>
> On 11/2/15, 5:36 PM, "patcharee"  wrote:
>
> >Hi,
> >
> >Is it possible to execute native system commands (in parallel) Spark,
> >like scala.sys.process ?
> >
> >Best,
> >Patcharee
> >
> >-
> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.5 on CDH 5.4.0

2015-11-01 Thread Deenar Toraskar
HI guys

I have documented the steps involved in getting Spark 1.5.1 run on CDH
5.4.0 here, let me know if it works for you as well
https://www.linkedin.com/pulse/running-spark-151-cdh-deenar-toraskar-cfa?trk=hp-feed-article-title-publish

Looking forward to CDH 5.5 which supports Spark 1.5.x out of the box.

Regards
Deenar




*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812


On 23 October 2015 at 17:31, Deenar Toraskar <deenar.toras...@gmail.com>
wrote:

> I got this working. For others trying this It turns out in Spark 1.3/CDH5.4
>
> spark.yarn.jar=local:/opt/cloudera/parcels/
>
> I had changed this to reflect the 1.5.1 version of spark assembly jar
>
> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>
> and this didn't work, I had to drop the "local:" prefix
>
> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>
> Regards
> Deenar
>
> On 23 October 2015 at 17:30, Deenar Toraskar <
> deenar.toras...@thinkreactive.co.uk> wrote:
>
>> I got this working. For others trying this It turns out in Spark
>> 1.3/CDH5.4
>>
>> spark.yarn.jar=local:/opt/cloudera/parcels/
>>
>> I had changed this to reflect the 1.5.1 version of spark assembly jar
>>
>> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>>
>> and this didn't work, I had to drop the "local:" prefix
>>
>> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>>
>> Regards
>> Deenar
>>
>>
>>
>>
>> *Think Reactive Ltd*
>> deenar.toras...@thinkreactive.co.uk
>> 07714140812
>>
>>
>>
>> On 23 October 2015 at 13:34, Deenar Toraskar <deenar.toras...@gmail.com>
>> wrote:
>>
>>> Sandy
>>>
>>> The assembly jar does contain org.apache.spark.deploy.yarn.ExecutorLauncher.
>>> I am trying to find out how i can increase the logging level, so I know the
>>> exact classpath used by Yarn ContainerLaunch.
>>>
>>> Deenar
>>>
>>> On 23 October 2015 at 03:30, Sandy Ryza <sandy.r...@cloudera.com> wrote:
>>>
>>>> Hi Deenar,
>>>>
>>>> The version of Spark you have may not be compiled with YARN support.
>>>> If you inspect the contents of the assembly jar, does
>>>> org.apache.spark.deploy.yarn.ExecutorLauncher exist?  If not, you'll
>>>> need to find a version that does have the YARN classes.  You can also build
>>>> your own using the -Pyarn flag.
>>>>
>>>> -Sandy
>>>>
>>>> On Thu, Oct 22, 2015 at 9:04 AM, Deenar Toraskar <
>>>> deenar.toras...@gmail.com> wrote:
>>>>
>>>>> Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
>>>>> http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
>>>>> working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
>>>>> well including connecting to the Hive metastore. I am facing an issue
>>>>> running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
>>>>> start as java cannot find ExecutorLauncher. Error: Could not find or
>>>>> load main class org.apache.spark.deploy.yarn.ExecutorLauncher client
>>>>> token: N/Adiagnostics: Application application_1443531450011_13437
>>>>> failed 2 times due to AM Container for
>>>>> appattempt_1443531450011_13437_02 exited with exitCode: 1Stack
>>>>> trace: ExitCodeException exitCode=1:at
>>>>> org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
>>>>> org.apache.hadoop.util.Shell.run(Shell.java:455)at
>>>>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
>>>>> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
>>>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
>>>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:262)at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
>>>>> java.lang.Thread.run(Thread.java:745) Any ideas as to what might be
>>>>> going wrong. Also how can I turn on more detailed logging to see what
>>>>> command line is being run by Yarn to launch containers? RegardsDeenar
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Pulling data from a secured SQL database

2015-10-31 Thread Deenar Toraskar
Thomas

I have the same problem, though in my case getting Kerberos authentication
to MSSQLServer from the cluster nodes does not seem to be supported. There
are a couple of options that come to mind.

1) You can pull the data running sqoop in local mode on the smaller
development machines and write to HDFS or to a persistent store connected
to your Spark cluster.
2) You can run Spark in local mode on the smaller development machines and
use JDBC Data Source and do something similar.

Regards
Deenar

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812




On 31 October 2015 at 11:35, Michael Armbrust 
wrote:

> I would try using the JDBC Data Source
> 
> and save the data to parquet
> .
> You can then put that data on your Spark cluster (probably install HDFS).
>
> On Fri, Oct 30, 2015 at 6:49 PM, Thomas Ginter 
> wrote:
>
>> I am working in an environment where data is stored in MS SQL Server.  It
>> has been secured so that only a specific set of machines can access the
>> database through an integrated security Microsoft JDBC connection.  We also
>> have a couple of beefy linux machines we can use to host a Spark cluster
>> but those machines do not have access to the databases directly.  How can I
>> pull the data from the SQL database on the smaller development machine and
>> then have it distribute to the Spark cluster for processing?  Can the
>> driver pull data and then distribute execution?
>>
>> Thanks,
>>
>> Thomas Ginter
>> 801-448-7676
>> thomas.gin...@utah.edu
>>
>>
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
Are you using Spark built with hive ?

# Apache Hadoop 2.4.X with Hive 13 support
mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive
-Phive-thriftserver -DskipTests clean package


On 29 October 2015 at 13:08, Zoltan Fedor <zoltan.0.fe...@gmail.com> wrote:

> Hi Deenar,
> As suggested, I have moved the hive-site.xml from HADOOP_CONF_DIR
> ($SPARK_HOME/hadoop-conf) to YARN_CONF_DIR ($SPARK_HOME/conf/yarn-conf) and
> use the below to start pyspark, but the error is the exact same as before.
>
> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/conf/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/pyspark --deploy-mode client
>
> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
> Type "help", "copyright", "credits" or "license" for more information.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/10/29 09:06:36 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> 15/10/29 09:06:38 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/29 09:07:03 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>> sqlContext2 = HiveContext(sc)
> >>> sqlContext2 = HiveContext(sc)
> >>> sqlContext2.sql("show databases").first()
> 15/10/29 09:07:34 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> 15/10/29 09:07:35 WARN ShellBasedUnixGroupsMapping: got exception trying
> to get groups for user biapp: id: biapp: No such user
>
> 15/10/29 09:07:35 WARN UserGroupInformation: No groups available for user
> biapp
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
> line 552, in sql
> return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
>   File
> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
> line 660, in _ssql_ctx
> "build/sbt assembly", e)
> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
> run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
> None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
> >>>
>
>
> On Thu, Oct 29, 2015 at 7:20 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> *Hi Zoltan*
>>
>> Add hive-site.xml to your YARN_CONF_DIR. i.e. $SPARK_HOME/conf/yarn-conf
>>
>> Deenar
>>
>> *Think Reactive Ltd*
>> deenar.toras...@thinkreactive.co.uk
>> 07714140812
>>
>> On 28 October 2015 at 14:28, Zoltan Fedor <zoltan.0.fe...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> We have a shared CDH 5.3.3 cluster and trying to use Spark 1.5.1 on it
>>> in yarn client mode with Hive.
>>>
>>> I have compiled Spark 1.5.1 with SPARK_HIVE=true, but it seems I am not
>>> able to make SparkSQL to pick up the hive-site.xml when runnig pyspark.
>>>
>>> hive-site.xml is located in $SPARK_HOME/hadoop-conf/hive-site.xml and
>>> also in $SPARK_HOME/conf/hive-site.xml
>>>
>>> When I start pyspark with the below command and then run some simple
>>> SparkSQL it fails, it seems it didn't pic up the settings in hive-site.xml
>>>
>>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/pyspark --deploy-mode client
>>>
>>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>>> Type "help", "copyright", "credits" or "license" for more information.
>>> SLF4J: Class path contains mult

Re: nested select is not working in spark sql

2015-10-29 Thread Deenar Toraskar
You can try the following syntax

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SubQueries

SELECT *
FROM A
WHERE A.a IN (SELECT foo FROM B);

Regards
Deenar
*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812



On 28 October 2015 at 14:37, Richard Hillegas  wrote:

> On 28 October 2015 at 14:37, Richard Hillegas  wrote:

>

Hi Kishor,
>
> Spark doesn't currently support subqueries in the WHERE clause. However,
> it looks as though someone is working on this right now:
> https://issues.apache.org/jira/browse/SPARK-4226
>
> Hope this helps,
> Rick Hillegas
>
>
>
> Kishor Bachhav  wrote on 10/28/2015 05:52:50 AM:
>
> > From: Kishor Bachhav 
> > To: user@spark.apache.org
> > Date: 10/28/2015 05:53 AM
> > Subject: nested select is not working in spark sql
>
> >
> > Hi,
>
> > I am trying to execute below query in spark sql but throws exception
> >
> > select n_name from NATION where n_regionkey = (select r_regionkey
> > from REGION where r_name='ASIA')
>
> > Exception:
> > Exception in thread "main" java.lang.RuntimeException: [1.55]
> > failure: ``)'' expected but identifier r_regionkey found
> >
> > select n_name from NATION where n_regionkey = (select r_regionkey
> > from REGION where r_name='ASIA')
> >   ^
> > at scala.sys.package$.error(package.scala:27)
> > at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse
> > (AbstractSparkSQLParser.scala:36)
> > at
> org.apache.spark.sql.SnappyParserDialect.parse(snappyParsers.scala:65)
> > at
> org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:169)
> > at
> org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:169)
> > at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark
> > $sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:115)
> > at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark
> > $sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
> > at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> > at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map
> > $1.apply(Parsers.scala:242)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map
> > $1.apply(Parsers.scala:242)
> > at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append
> > $1$$anonfun$apply$2.apply(Parsers.scala:254)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append
> > $1$$anonfun$apply$2.apply(Parsers.scala:254)
> > at
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append
> > $1.apply(Parsers.scala:254)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append
> > $1.apply(Parsers.scala:254)
> > at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> > at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply
> > $14.apply(Parsers.scala:891)
> > at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply
> > $14.apply(Parsers.scala:891)
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> > at
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> > at scala.util.parsing.combinator.PackratParsers$$anon$1.apply
> > (PackratParsers.scala:110)
> > at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse
> > (AbstractSparkSQLParser.scala:34)
> >
>
> > Same is working in mysql as well as memsql.
>
> > Expected Result is
> >
> > memsql> select n_name from NATION where n_regionkey = (select
> > r_regionkey from REGION where r_name='ASIA');
> > +---+
> > | n_name|
> > +---+
> > | INDIA |
> > | INDONESIA |
> > | JAPAN |
> > | CHINA |
> > | VIETNAM   |
> > +---+
> > 5 rows in set (0.71 sec)
>
> > How can I make this work in spark sql?
>
> > Actually above query is one simplified version of Minimum cost
> > supplier query (Q2) of TPCH which has this nested select nature. I
> > am working on these TPCH queries. If anybody has the modified set of
> > TPCH queries for spark sql, kindly let me know. It will be very useful
> for me.
> >
> > select
> > s_acctbal,
> > s_name,
> > n_name,
> > p_partkey,
> > p_mfgr,
> > s_address,
> > s_phone,
> > s_comment
> > from
> > part,
> > supplier,
> > partsupp,
> > nation,
> > region
> > where
> > p_partkey = ps_partkey
> > and s_suppkey = ps_suppkey
> > and p_size = [SIZE]
> > and p_type like '%[TYPE]'
> > and s_nationkey = n_nationkey
> > and n_regionkey = r_regionkey
> > and r_name = '[REGION]'
> > and ps_supplycost = (
> >   select
> > min(ps_supplycost)
> > from
> > partsupp, supplier,
> > nation, region
> > 

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-29 Thread Deenar Toraskar
Hi Bryan

For your use case you don't need to have multiple metastores. The default
metastore uses embedded Derby
.
This cannot be shared amongst multiple processes. Just switch to a
metastore that supports multiple connections viz. Networked Derby or mysql.
see https://cwiki.apache.org/confluence/display/Hive/HiveDerbyServerMode

Deenar


*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812


On 29 October 2015 at 00:56, Bryan  wrote:

> Yana,
>
> My basic use-case is that I want to process streaming data, and publish it
> to a persistent spark table. After that I want to make the published data
> (results) available via JDBC and spark SQL to drive a web API. That would
> seem to require two drivers starting separate HiveContexts (one for
> sparksql/jdbc, one for streaming)
>
> Is there a way to share a hive context between the driver for the thrift
> spark SQL instance and the streaming spark driver? A better method to do
> this?
>
> An alternate option might be to create the table in two separate
> metastores and simply use the same storage location for the data. That
> seems very hacky though, and likely to result in maintenance issues.
>
> Regards,
>
> Bryan Jeffrey
> --
> From: Yana Kadiyska 
> Sent: ‎10/‎28/‎2015 8:32 PM
> To: Bryan Jeffrey 
> Cc: Susan Zhang ; user 
> Subject: Re: Spark -- Writing to Partitioned Persistent Table
>
> For this issue in particular ( ERROR XSDB6: Another instance of Derby may
> have already booted the database /spark/spark-1.4.1/metastore_db) -- I
> think it depends on where you start your application and HiveThriftserver
> from. I've run into a similar issue running a driver app first, which would
> create a directory called metastore_db. If I then try to start SparkShell
> from the same directory, I will see this exception. So it is like
> SPARK-9776. It's not so much that the two are in the same process (as the
> bug resolution states) I think you can't run 2 drivers which start a
> HiveConext from the same directory.
>
>
> On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey 
> wrote:
>
>> All,
>>
>> One issue I'm seeing is that I start the thrift server (for jdbc access)
>> via the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
>> spark://master:7077 --hiveconf "spark.cores.max=2"
>>
>> After about 40 seconds the Thrift server is started and available on
>> default port 1.
>>
>> I then submit my application - and the application throws the following
>> error:
>>
>> Caused by: java.sql.SQLException: Failed to start database 'metastore_db'
>> with class loader
>> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721,
>> see the next exception for details.
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>> Source)
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>> Source)
>> ... 86 more
>> Caused by: java.sql.SQLException: Another instance of Derby may have
>> already booted the database /spark/spark-1.4.1/metastore_db.
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>> Source)
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>> Source)
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
>> Source)
>> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown
>> Source)
>> ... 83 more
>> Caused by: ERROR XSDB6: Another instance of Derby may have already booted
>> the database /spark/spark-1.4.1/metastore_db.
>>
>> This also happens if I do the opposite (submit the application first, and
>> then start the thrift server).
>>
>> It looks similar to the following issue -- but not quite the same:
>> https://issues.apache.org/jira/browse/SPARK-9776
>>
>> It seems like this set of steps works fine if the metadata database is
>> not yet created - but once it's created this happens every time.  Is this a
>> known issue? Is there a workaround?
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey 
>> wrote:
>>
>>> Susan,
>>>
>>> I did give that a shot -- I'm seeing a number of oddities:
>>>
>>> (1) 'Partition By' appears only accepts alphanumeric lower case fields.
>>> It will work for 'machinename', but not 'machineName' or 'machine_name'.
>>> (2) When partitioning with maps included in the data I get odd string
>>> conversion issues
>>> (3) When partitioning without maps I see frequent out of memory issues
>>>
>>> I'll update this email when I've got a more concrete example of problems.
>>>
>>> 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
I dont know a lot about how pyspark works. Can you possibly try running
spark-shell and do the same?

sqlContext.sql("show databases").collect

Deenar

On 29 October 2015 at 14:18, Zoltan Fedor <zoltan.0.fe...@gmail.com> wrote:

> Yes, I am. It was compiled with the following:
>
> export SPARK_HADOOP_VERSION=2.5.0-cdh5.3.3
> export SPARK_YARN=true
> export SPARK_HIVE=true
> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
> -XX:ReservedCodeCacheSize=512m"
> mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.0-cdh5.3.3 -Phive
> -Phive-thriftserver -DskipTests clean package
>
> On Thu, Oct 29, 2015 at 10:16 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Are you using Spark built with hive ?
>>
>> # Apache Hadoop 2.4.X with Hive 13 support
>> mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
>> -DskipTests clean package
>>
>>
>> On 29 October 2015 at 13:08, Zoltan Fedor <zoltan.0.fe...@gmail.com>
>> wrote:
>>
>>> Hi Deenar,
>>> As suggested, I have moved the hive-site.xml from HADOOP_CONF_DIR
>>> ($SPARK_HOME/hadoop-conf) to YARN_CONF_DIR ($SPARK_HOME/conf/yarn-conf) and
>>> use the below to start pyspark, but the error is the exact same as before.
>>>
>>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/conf/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/pyspark --deploy-mode client
>>>
>>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>>> Type "help", "copyright", "credits" or "license" for more information.
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> 15/10/29 09:06:36 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>> 15/10/29 09:06:38 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 15/10/29 09:07:03 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>>> SparkContext available as sc, HiveContext available as sqlContext.
>>> >>> sqlContext2 = HiveContext(sc)
>>> >>> sqlContext2 = HiveContext(sc)
>>> >>> sqlContext2.sql("show databases").first()
>>> 15/10/29 09:07:34 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> 15/10/29 09:07:35 WARN ShellBasedUnixGroupsMapping: got exception trying
>>> to get groups for user biapp: id: biapp: No such user
>>>
>>> 15/10/29 09:07:35 WARN UserGroupInformation: No groups available for
>>> user biapp
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File
>>> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
>>> line 552, in sql
>>> return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
>>>   File
>>> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
>>> line 660, in _ssql_ctx
>>> "build/sbt assembly", e)
>>> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true'
>>> and run build/sbt assembly", Py4JJavaError(u'An error occurred while
>>> calling None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
>>> >>>
>>>
>>>
>>> On Thu, Oct 29, 2015 at 7:20 AM, Deenar Toraskar <
>>> deenar.toras...@gmail.com> wrote:
>>>
>>>> *Hi Zoltan*
>>>>
>>>> Add hive-site.xml to your YARN_CONF_DIR. i.e.
>>>> $SPARK_HOME/conf/yarn-conf
>>>>
>>>> Deenar
>>&

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
g.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:827)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:820)
> at
> org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3679)
> at
> org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:597)
> at
> org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
> ... 56 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
> Permission denied: user=biapp, access=WRITE,
> inode="/user":hdfs:supergroup:drwxr-xr-x
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
> at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6221)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4088)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4058)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4031)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:788)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1411)
> at org.apache.hadoop.ipc.Client.call(Client.java:1364)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> at com.sun.proxy.$Proxy14.mkdirs(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:531)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy15.mkdirs(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2687)
> ... 66 more
>
> :10: error: not found: value sqlContext
>import sqlContext.implicits._
>   ^
> :10: error: not found: value sqlContext
>import sqlContext.sql
>   ^
>
> scala> sqlContext.sql("show databases").collect
> :14: error: not found: value sqlContext
>   sqlContext.sql("show databases").collect
>   ^
>
> scala>
>
> On Thu, Oct 29, 2015 at 10:26 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> I dont know a lot about how pyspark works. Can you possibly try running
>> spark-shell and do 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
Zoltan

you should have these in your existing CDH 5.3, that's the best place to
get them. Find where spark is running from and should should have them

My versions are here

https://gist.github.com/deenar/08fc4ac0da3bdaff10fb

Deenar

On 29 October 2015 at 15:29, Zoltan Fedor <zoltan.0.fe...@gmail.com> wrote:

> i don't have spark-defaults.conf and spark-env.sh, so if you have a
> working Spark 1.5.1 with Hive metastore access on CDH 5.3 then could you
> please send over the settings you are having in your spark-defaults.conf
> and spark-env.sh?
> Thanks
>
> On Thu, Oct 29, 2015 at 11:14 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Here is what I did, maybe that will help you.
>>
>> 1) Downloaded spark-1.5.1 (With HAdoop 2.6.0) spark-1.5.1-bin-hadoop2.6
>> and extracted it on the edge node, set SPARK_HOME to this location
>> 2) Copied the existing configuration (spark-defaults.conf and
>> spark-env.sh) from your spark install
>> (/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf on our environment) to
>> $SPARK_HOME/conf
>> 3) updated spark.yarn.jar in spark-defaults.conf
>> 4) copied over all the configuration files from
>> /opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf to
>> $SPARK_HOME/conf/yarn-conf
>>
>> and it worked. You may be better off with a custom build for CDH 5.3.3
>> hadoop, which you already have done.
>>
>> Deenar
>>
>> On 29 October 2015 at 14:35, Zoltan Fedor <zoltan.0.fe...@gmail.com>
>> wrote:
>>
>>> Sure, I did it with spark-shell, which seems to be showing the same
>>> error - not using the hive-site.xml
>>>
>>>
>>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
>>> $HIVE_CLASSPATH
>>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>>> Type "help", "copyright", "credits" or "license" for more information.
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>>> SparkContext available as sc, HiveContext available as sqlContext.
>>> >>>
>>> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/spark-shell --deploy-mode client
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
>>> Type in expressions to have them evaluated.
>>> Type :help for more information.
>>> 15/10/29 

Re: Packaging a jar for a jdbc connection using sbt assembly and scala.

2015-10-29 Thread Deenar Toraskar
Hi Dean

I guess you are using Spark 1.3.


   - The JDBC driver class must be visible to the primordial class loader
   on the client session and on all executors. This is because Java’s
   DriverManager class does a security check that results in it ignoring all
   drivers not visible to the primordial class loader when one goes to open a
   connection. One convenient way to do this is to modify compute_classpath.sh
   on all worker nodes to include your driver JARs.

Take a look at this https://issues.apache.org/jira/browse/SPARK-6913 and
see
http://stackoverflow.com/questions/30221677/spark-sql-postgresql-jdbc-classpath-issues
.


Regards
Deenar

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812



On 29 October 2015 at 10:34, dean.wood  wrote:

> I'm having a problem building a spark jar with scala. It's a really simple
> thing, I want to programatically access a mysql server via JDBC and load it
>

On 29 October 2015 at 10:34, dean.wood  wrote:

> I'm having a problem building a spark jar with scala. It's a really simple
> thing, I want to programatically access a mysql server via JDBC and load it
> in to a spark data frame. I can get this to work in the spark shell but I
> cannot package a jar that works with spark submit. It will package but when
> running, fails with
>
> Exception in thread "main" java.sql.SQLException: No suitable driver found
> for jdbc:mysql://localhost:3310/100million
> My spark-submit command is
>
> ./bin/spark-submit ~/path/to/scala/project/target/scala-2.10/complete.jar
> --driver-class-path ~/path/to/mysql-connector-java-5.1.37-bin.jar
>
> My build.sbt looks like
>
> name := "sql_querier"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> sbtVersion := "0.13.7"
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
> "provided"
>
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1" %
> "provided"
>
> libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.37"
>
> assemblyJarName in assembly := "complete.jar"
>
> mainClass in assembly := Some("sql_querier")
>
> offline := true
> and my very simple code is
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SQLContext
>
> object sql_querier{
>
> def main(args: Array[String]) {
>
> val sc = new org.apache.spark.SparkContext()
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> val url="jdbc:mysql://databaseurl:portno/database"
>
> val prop = new java.util.Properties
> prop.setProperty("user","myuser")
> prop.setProperty("password","mydatabase")
> val cats=sqlContext.read.jdbc(url, "categories", prop)
> cats.show
>  }
>  }
> Where I've hidden the real values for user password and database url. I've
> also got a file in projects that adds the sbt assembly plugin and there is
> nothing wrong with this. I've successfully used sbt assembly before with
> this configuration. When starting a spark shell with the
> --driver-class-path
> option pointing to the mysql jar, I can run the commands and extract data
> from the mysql database.
>
> I've tried version 5.1.34 and 5.0.8 and neither have worked. I've also
> tried
> changing --driver-class-path for --jar in the spark submit command and
> adding the lines
>
>
>
> sc.addJar("/Users/dean.wood/data_science/scala/sqlconn/mysql-connector-java-5.0.8-bin.jar")
> Class.forName("com.mysql.jdbc.Driver")
>
> to the scala code.
>
> Any clue what I am doing wrong with the build would be greatly appreciated.
>
> Dean
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-jar-for-a-jdbc-connection-using-sbt-assembly-and-scala-tp25225.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
*Hi Zoltan*

Add hive-site.xml to your YARN_CONF_DIR. i.e. $SPARK_HOME/conf/yarn-conf

Deenar

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812

On 28 October 2015 at 14:28, Zoltan Fedor  wrote:

> Hi,
> We have a shared CDH 5.3.3 cluster and trying to use Spark 1.5.1 on it in
> yarn client mode with Hive.
>
> I have compiled Spark 1.5.1 with SPARK_HIVE=true, but it seems I am not
> able to make SparkSQL to pick up the hive-site.xml when runnig pyspark.
>
> hive-site.xml is located in $SPARK_HOME/hadoop-conf/hive-site.xml and also
> in $SPARK_HOME/conf/hive-site.xml
>
> When I start pyspark with the below command and then run some simple
> SparkSQL it fails, it seems it didn't pic up the settings in hive-site.xml
>
> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/pyspark --deploy-mode client
>
> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
> Type "help", "copyright", "credits" or "license" for more information.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/10/28 10:22:33 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> 15/10/28 10:22:35 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/28 10:22:59 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>> sqlContext2 = HiveContext(sc)
> >>> sqlContext2.sql("show databases").first()
> 15/10/28 10:23:12 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> 15/10/28 10:23:13 WARN ShellBasedUnixGroupsMapping: got exception trying
> to get groups for user biapp: id: biapp: No such user
>
> 15/10/28 10:23:13 WARN UserGroupInformation: No groups available for user
> biapp
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
> line 552, in sql
> return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
>   File
> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
> line 660, in _ssql_ctx
> "build/sbt assembly", e)
> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
> run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
> None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
> >>>
>
>
> See in the above the warning about "WARN HiveConf: HiveConf of name
> hive.metastore.local does not exist" while actually there is a
> hive.metastore.local attribute in the hive-site.xml
>
> Any idea how to submit hive-site.xml in yarn client mode?
>
> Thanks
>


Re: Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, Spark 1.5.1)

2015-10-27 Thread Deenar Toraskar
Till Spark Streaming supports dynamic allocation, you could use
StreamingListener to monitor batch execution times and based on it
sparkContext.requestExecutors() and sparkContext.killExecutors() to add and
remove executors explicitly and .



On 26 October 2015 at 21:37, Ted Yu  wrote:

> This is related:
> SPARK-10955 Warn if dynamic allocation is enabled for Streaming jobs
>
> which went into 1.6.0 as well.
>
> FYI
>
> On Mon, Oct 26, 2015 at 2:26 PM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Hi Matthias,
>>
>> Unless there was a change in 1.5, I'm afraid dynamic resource allocation
>> is not yet supported in streaming apps.
>>
>> Thanks,
>> Silvio
>>
>> Sent from my Lumia 930
>> --
>> From: Matthias Niehoff 
>> Sent: ‎10/‎26/‎2015 4:00 PM
>> To: user@spark.apache.org
>> Subject: Dynamic Resource Allocation with Spark Streaming (Standalone
>> Cluster, Spark 1.5.1)
>>
>> Hello everybody,
>>
>> I have a few (~15) Spark Streaming jobs which have load peaks as well as
>> long times with a low load. So I thought the new Dynamic Resource
>> Allocation for Standalone Clusters might be helpful (SPARK-4751).
>>
>> I have a test "cluster" with 1 worker consisting of 4 executors with 2
>> cores each, so 8 cores in total.
>>
>> I started a simple streaming application without limiting the max cores
>> for this app. As expected the app occupied every core of the cluster. Then
>> I started a second app, also without limiting the maximum cores. As the
>> first app did not get any input through the stream, my naive expectation
>> was that the second app would get at least 2 cores (1 receiver, 1
>> processing), but that's not what happened. The cores are still assigned to
>> the first app.
>> When I look at the application UI of the first app every executor is
>> still running. That explains why no executor is used for the second app.
>>
>> I end up with two questions:
>> - When does an executor getting idle in a Spark Streaming application?
>> (and so could be reassigned to another app)
>> - Is there another way to compete with uncertain load when using Spark
>> Streaming Applications? I already combined multiple jobs to a Spark
>> Application using different threads, but this approach comes to a limit for
>> me, because Spark Applications get to big to manage.
>>
>> Thank You!
>>
>>
>>
>


Re: Broadcast table

2015-10-27 Thread Deenar Toraskar
1) if you are using thrift server any cached tables would be cached for all
sessions (I am not sure if this was your question)
2) If you want to ensure that the smaller table in the join is replicated
to all nodes, you can do the following

left.join(broadcast(right), "joinKey")

look at this https://issues.apache.org/jira/browse/SPARK-8300,

Deenar

On 26 October 2015 at 20:43, Jags Ramnarayanan 
wrote:

> If you are using Spark SQL and joining two dataFrames the optimizer would
> automatically broadcast the smaller table (You can configure the size if
> the default is too small).
>
> Else, in code, you can collect any RDD to the driver and broadcast using
> the context.broadcast method.
>
> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf
>
> -- Jags
> (www.snappydata.io)
>
>
> On Mon, Oct 26, 2015 at 11:17 AM, Younes Naguib <
> younes.nag...@tritondigital.com> wrote:
>
>> Hi all,
>>
>>
>>
>> I use the thrift server, and I cache a table using “cache table mytab”.
>>
>> Is there any sql to broadcast it too?
>>
>>
>>
>> *Thanks*
>>
>> *Younes Naguib*
>>
>> Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G
>> 1R8
>>
>> Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | younes.naguib
>> @tritondigital.com 
>>
>>
>>
>
>


Re: get directory names that are affected by sc.textFile("path/to/dir/*/*/*.js")

2015-10-27 Thread Deenar Toraskar
This won't work as you can never guarantee which files were read by Spark
if some other process is writing files to the same location. It would be
far less work to move files matching your pattern to a staging location and
then load them using sc.textFile. you should find hdfs file system calls
that are equivalent to normal file system if command line tools like distcp
or mv don't meet your needs.
On 27 Oct 2015 1:49 p.m., "Նարեկ Գալստեան"  wrote:

> Dear Spark users,
>
> I am reading a set of json files to compile them to Parquet data format.
> I am willing to mark the folders in some way after having read their
> contents so that I do not read it again(e.g. I can changed the name of the
> folder).
>
> I use .textFile("path/to*/dir/*/*/*.js") *technique to* automatically
> *detect
> the files.
> I cannot however, use the same notation* to rename them.*
>
> Could you suggest how I can *get the names of these folders* so that I can
> rename them using native hadoop libraries.
>
> I am using Apache Spark 1.4.1
>
> I look forward to hearing suggestions!!
>
> yours,
>
> Narek
>
> Նարեկ Գալստյան
>


Re: get host from rdd map

2015-10-26 Thread Deenar Toraskar
   1. You can call any api that returns you the hostname in your map
   function. Here's a simplified example, You would generally use
   mapPartitions as it will save the overhead of retrieving hostname multiple
   times
   2.
   3. import scala.sys.process._
   4. val distinctHosts = sc.parallelize(0 to 100).map { _ =>
   5. val hostname = ("hostname".!!).trim
   6. // your code
   7. (hostname)
   8. }.collect.distinct
   9.


On 24 October 2015 at 01:41, weoccc  wrote:

> yea,
>
> my use cases is that i want to have some external communications where rdd
> is being run in map. The external communication might be handled separately
> transparent to spark.  What will be the hacky way and nonhacky way to do
> that ? :)
>
> Weide
>
>
>
> On Fri, Oct 23, 2015 at 5:32 PM, Ted Yu  wrote:
>
>> Can you outline your use case a bit more ?
>>
>> Do you want to know all the hosts which would run the map ?
>>
>> Cheers
>>
>> On Fri, Oct 23, 2015 at 5:16 PM, weoccc  wrote:
>>
>>> in rdd map function, is there a way i can know the list of host names
>>> where the map runs ? any code sample would be appreciated ?
>>>
>>> thx,
>>>
>>> Weide
>>>
>>>
>>>
>>
>


Re: Spark scala REPL - Unable to create sqlContext

2015-10-26 Thread Deenar Toraskar
Embedded Derby, which Hive/Spark SQL uses as the default metastore only
supports a single user at a time. Till this issue is fixed, you could use
another metastore that supports multiple concurrent users (e.g. networked
derby or mysql) to get around it.

On 25 October 2015 at 16:15, Ge, Yao (Y.)  wrote:

> Thanks. I wonder why this is not widely reported in the user forum. The
> RELP shell is basically broken in 1.5 .0 and 1.5.1
>
> -Yao
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Sunday, October 25, 2015 12:01 PM
> *To:* Ge, Yao (Y.)
> *Cc:* user
> *Subject:* Re: Spark scala REPL - Unable to create sqlContext
>
>
>
> Have you taken a look at the fix for SPARK-11000 which is in the upcoming
> 1.6.0 release ?
>
>
>
> Cheers
>
>
>
> On Sun, Oct 25, 2015 at 8:42 AM, Yao  wrote:
>
> I have not been able to start Spark scala shell since 1.5 as it was not
> able
> to create the sqlContext during the startup. It complains the metastore_db
> is already locked: "Another instance of Derby may have already booted the
> database". The Derby log is attached.
>
> I only have this problem with starting the shell in yarn-client mode. I am
> working with HDP2.2.6 which runs Hadoop 2.6.
>
> -Yao derby.log
>  >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-scala-REPL-Unable-to-create-sqlContext-tp25195.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Spark 1.5 on CDH 5.4.0

2015-10-23 Thread Deenar Toraskar
Sandy

The assembly jar does contain org.apache.spark.deploy.yarn.ExecutorLauncher.
I am trying to find out how i can increase the logging level, so I know the
exact classpath used by Yarn ContainerLaunch.

Deenar

On 23 October 2015 at 03:30, Sandy Ryza <sandy.r...@cloudera.com> wrote:

> Hi Deenar,
>
> The version of Spark you have may not be compiled with YARN support.  If
> you inspect the contents of the assembly jar, does
> org.apache.spark.deploy.yarn.ExecutorLauncher exist?  If not, you'll need
> to find a version that does have the YARN classes.  You can also build your
> own using the -Pyarn flag.
>
> -Sandy
>
> On Thu, Oct 22, 2015 at 9:04 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
>> http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
>> working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
>> well including connecting to the Hive metastore. I am facing an issue
>> running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
>> start as java cannot find ExecutorLauncher. Error: Could not find or
>> load main class org.apache.spark.deploy.yarn.ExecutorLauncher client
>> token: N/Adiagnostics: Application application_1443531450011_13437
>> failed 2 times due to AM Container for
>> appattempt_1443531450011_13437_02 exited with exitCode: 1Stack
>> trace: ExitCodeException exitCode=1:at
>> org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
>> org.apache.hadoop.util.Shell.run(Shell.java:455)at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
>> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
>> java.util.concurrent.FutureTask.run(FutureTask.java:262)at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
>> java.lang.Thread.run(Thread.java:745) Any ideas as to what might be
>> going wrong. Also how can I turn on more detailed logging to see what
>> command line is being run by Yarn to launch containers? RegardsDeenar
>>
>
>


Re: Best way to use Spark UDFs via Hive (Spark Thrift Server)

2015-10-23 Thread Deenar Toraskar
You can do the following. Start the spark-shell. Register the UDFs in the
shell using sqlContext, then start the Thrift Server using startWithContext
from the spark shell:
https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala#L56


Regards
Deenar

On 23 October 2015 at 01:15, Dave Moyers  wrote:

> Hi,
>
> We have several udf's written in Scala that we use within jobs submitted
> into Spark. They work perfectly with the sqlContext after being registered.
> We also allow access to saved tables via the Hive Thrift server bundled
> with Spark. However, we would like to allow Hive connections to use the
> udf's in their queries against the saved tables. Is there a way to register
> udf's such that they can be used within both a Spark job and in a Hive
> connection?
>
> Thanks!
> Dave
>
> Sent from my iPad
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.5 on CDH 5.4.0

2015-10-23 Thread Deenar Toraskar
I got this working. For others trying this It turns out in Spark 1.3/CDH5.4

spark.yarn.jar=local:/opt/cloudera/parcels/

I had changed this to reflect the 1.5.1 version of spark assembly jar

spark.yarn.jar=/opt/spark-1.5.1-bin/...

and this didn't work, I had to drop the "local:" prefix

spark.yarn.jar=/opt/spark-1.5.1-bin/...

Regards
Deenar

On 23 October 2015 at 17:30, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> I got this working. For others trying this It turns out in Spark 1.3/CDH5.4
>
> spark.yarn.jar=local:/opt/cloudera/parcels/
>
> I had changed this to reflect the 1.5.1 version of spark assembly jar
>
> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>
> and this didn't work, I had to drop the "local:" prefix
>
> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>
> Regards
> Deenar
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 23 October 2015 at 13:34, Deenar Toraskar <deenar.toras...@gmail.com>
> wrote:
>
>> Sandy
>>
>> The assembly jar does contain org.apache.spark.deploy.yarn.ExecutorLauncher.
>> I am trying to find out how i can increase the logging level, so I know the
>> exact classpath used by Yarn ContainerLaunch.
>>
>> Deenar
>>
>> On 23 October 2015 at 03:30, Sandy Ryza <sandy.r...@cloudera.com> wrote:
>>
>>> Hi Deenar,
>>>
>>> The version of Spark you have may not be compiled with YARN support.  If
>>> you inspect the contents of the assembly jar, does
>>> org.apache.spark.deploy.yarn.ExecutorLauncher exist?  If not, you'll
>>> need to find a version that does have the YARN classes.  You can also build
>>> your own using the -Pyarn flag.
>>>
>>> -Sandy
>>>
>>> On Thu, Oct 22, 2015 at 9:04 AM, Deenar Toraskar <
>>> deenar.toras...@gmail.com> wrote:
>>>
>>>> Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
>>>> http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
>>>> working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
>>>> well including connecting to the Hive metastore. I am facing an issue
>>>> running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
>>>> start as java cannot find ExecutorLauncher. Error: Could not find or
>>>> load main class org.apache.spark.deploy.yarn.ExecutorLauncher client
>>>> token: N/Adiagnostics: Application application_1443531450011_13437
>>>> failed 2 times due to AM Container for
>>>> appattempt_1443531450011_13437_02 exited with exitCode: 1Stack
>>>> trace: ExitCodeException exitCode=1:at
>>>> org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
>>>> org.apache.hadoop.util.Shell.run(Shell.java:455)at
>>>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
>>>> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
>>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
>>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
>>>> java.util.concurrent.FutureTask.run(FutureTask.java:262)at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
>>>> java.lang.Thread.run(Thread.java:745) Any ideas as to what might be
>>>> going wrong. Also how can I turn on more detailed logging to see what
>>>> command line is being run by Yarn to launch containers? RegardsDeenar
>>>>
>>>
>>>
>>
>


Re: Maven Repository Hosting for Spark SQL 1.5.1

2015-10-22 Thread Deenar Toraskar
I can see this artifact in public repos
http://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10/1.5.1

http://central.maven.org/maven2/org/apache/spark/spark-sql_2.10/1.5.1/spark-sql_2.10-1.5.1.jar

check your proxy settings or the list of repos you are using.

Deenar

On 22 October 2015 at 17:48, William Li  wrote:

>
> Hi – I tried to download the Spark SQL 2.10 and version 1.5.1 from
> Intellij using the maven library:
>
> -Project Structure
> -Global Library, click on the + to select Maven Repository
> -Type in org.apache.spark to see the list.
> -The list result only shows version up to spark-sql_2.10-1.1.1
> -I tried to manually typed in the version 1.5.1 but it doesn’t download
> the correct list of files needed.
>
> I can’t see the 1.5.1 version. So there seems to be some problem because
> Intellij reported that org.apache.spark:spark-sql_2.10-1.5.1 is not
> available or not complete.
>
> Does anyone know who to contact to verify that?
>
>
> Thanks,
>
> William.
>


Accessing external Kerberised resources from Spark executors in Yarn client/cluster mode

2015-10-22 Thread Deenar Toraskar
Hi All

I am trying to access a SQLServer that uses Kerberos for authentication
from Spark. I can successfully connect to the SQLServer from the driver
node, but any connections to SQLServer from executors fails with "Failed to
find any Kerberos tgt".

org.apache.hadoop.security.UserGroupInformation.getCurrentUser on the
driver returns *myPrincipal (auth:KERBEROS) *as expected. And the same call
on executors returns

sc.parallelize(0 to 10).map { _ =>(("hostname".!!).trim,
UserGroupInformation.getCurrentUser.toString)}.collect.distinct

returns

Array((hostname1, myprincipal (auth:SIMPLE), (hostname2, myprincipal
(auth:SIMPLE))


I tried passing the keytab and logging in explicitly from the executors,
but that didnt help either.

sc.parallelize(0 to 10).map { _
=>(SparkHadoopUtil.get.loginUserFromKeytab("myprincipal",SparkFiles.get("myprincipal.keytab")),
("hostname".!!).trim,
UserGroupInformation.getCurrentUser.toString)}.collect.distinct

Digging deeper I found SPARK-6207 and came across code for each Kerberised
service that is accessed from the executors in Yarn Client, such as

obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf,
credentials)

I was wondering if anyone has been successful in accessing external
resources (running external to the Hadoop cluster) secured by Kerberos in
Spark executors running in Yarn.



Regards
Deenar


On 20 April 2015 at 21:58, Andrew Lee  wrote:

> Hi All,
>
> Affected version: spark 1.2.1 / 1.2.2 / 1.3-rc1
>
> Posting this problem to user group first to see if someone is encountering
> the same problem.
>
> When submitting spark jobs that invokes HiveContext APIs on a Kerberos
> Hadoop + YARN (2.4.1) cluster,
> I'm getting this error.
>
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
>
> Apparently, the Kerberos ticket is not on the remote data node nor
> computing node since we don't
> deploy Kerberos tickets, and that is not a good practice either. On the
> other hand, we can't just SSH to every machine and run kinit for that
> users. This is not practical and it is insecure.
>
> The point here is that shouldn't there be a delegation token during the
> doAs to use the token instead of the ticket ?
> I'm trying to understand what is missing in Spark's HiveContext API while
> a normal MapReduce job that invokes Hive APIs will work, but not in Spark
> SQL. Any insights or feedback are appreciated.
>
> Anyone got this running without pre-deploying (pre-initializing) all
> tickets node by node? Is this worth filing a JIRA?
>
>
>
> 15/03/25 18:59:08 INFO hive.metastore: Trying to connect to metastore with
> URI thrift://alee-cluster.test.testserver.com:9083
> 15/03/25 18:59:08 ERROR transport.TSaslTransport: SASL negotiation failure
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
> at
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
> at
> org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
> at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
> at
> org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:336)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:214)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:62)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
> at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
> at
> 

Spark 1.5 on CDH 5.4.0

2015-10-22 Thread Deenar Toraskar
Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
well including connecting to the Hive metastore. I am facing an issue
running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
start as java cannot find ExecutorLauncher. Error: Could not find or load
main class org.apache.spark.deploy.yarn.ExecutorLauncher client token:
N/Adiagnostics:
Application application_1443531450011_13437 failed 2 times due to AM
Container for appattempt_1443531450011_13437_02 exited with
exitCode: 1Stack
trace: ExitCodeException exitCode=1:at
org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
org.apache.hadoop.util.Shell.run(Shell.java:455)at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
java.util.concurrent.FutureTask.run(FutureTask.java:262)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
java.lang.Thread.run(Thread.java:745) Any ideas as to what might be going
wrong. Also how can I turn on more detailed logging to see what command
line is being run by Yarn to launch containers? RegardsDeenar


Re: Spark SQL Exception: Conf non-local session path expected to be non-null

2015-10-20 Thread Deenar Toraskar
This seems to be set using hive.exec.scratchdir, is that set?

hdfsSessionPath = new Path(hdfsScratchDirURIString, sessionId);
createPath(conf, hdfsSessionPath, scratchDirPermission, false, true);
conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString());


On 20 October 2015 at 00:20, Ted Yu  wrote:

> A brief search led me
> to ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java :
>
>   private static final String HDFS_SESSION_PATH_KEY =
> "_hive.hdfs.session.path";
> ...
>   public static Path getHDFSSessionPath(Configuration conf) {
> SessionState ss = SessionState.get();
> if (ss == null) {
>   String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY);
>   Preconditions.checkNotNull(sessionPathString,
>   "Conf non-local session path expected to be non-null");
>   return new Path(sessionPathString);
> }
> Preconditions.checkNotNull(ss.hdfsSessionPath,
> "Non-local session path expected to be non-null");
> return ss.hdfsSessionPath;
>
> FYI
>
> On Mon, Oct 19, 2015 at 1:08 PM, YaoPau  wrote:
>
>> I've connected Spark SQL to the Hive Metastore and currently I'm running
>> SQL
>> code via pyspark.  Typically everything works fine, but sometimes after a
>> long-running Spark SQL job I get the error below, and from then on I can
>> no
>> longer run Spark SQL commands.  I still do have both my sc and my sqlCtx.
>>
>> Any idea what this could mean?
>>
>> An error occurred while calling o36.sql.
>> : org.apache.spark.sql.AnalysisException: Conf non-local session path
>> expected to be non-null;
>> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:260)
>> at
>>
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
>> at
>>
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>>
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at
>>
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
>> at
>>
>> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>> at
>>
>> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
>> at
>> org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
>> at
>> org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
>> at
>>
>> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
>> at
>>
>> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>> at
>>
>> 

Re: can I use Spark as alternative for gem fire cache ?

2015-10-20 Thread Deenar Toraskar
Kali

>> can I cache a RDD in memory for a whole day ? as of I know RDD will get
empty once the spark code finish executing (correct me if I am wrong).

Spark can definitely be used as a replacement for in memory databases for
certain use cases. Spark RDDs are not shared amongst contextss. You need a
long running Spark context and a REST API  (see JobServer) or some other
RPC mechanism to allow clients access information from the cached RDD in
the long running context.

Things to note are RDDs are immutable and do not support granular updates
and operations like key value lookups out of the box (though IndexedRDD
addresses some of these use cases). Spark will not be suitable for all IMDB
usecases. If you are using IMDBs for aggregation and reporting, Spark is a
much better fit. If you are using IMDBs for maintaining shared mutable
state then Spark is not designed for these use cases.

Hope that helps.

Deenar



Deenar

On 17 October 2015 at 19:05, Ndjido Ardo Bar  wrote:

> Hi Kali,
>
> If I do understand you well, Tachyon ( http://tachyon-project.org) can be
> good alternative. You can use Spark Api to load and persist data into
> Tachyon.
> Hope that will help.
>
> Ardo
>
> > On 17 Oct 2015, at 15:28, "kali.tumm...@gmail.com" <
> kali.tumm...@gmail.com> wrote:
> >
> > Hi All,
> >
> > Can spark be used as an alternative to gem fire cache ? we use gem fire
> > cache to save (cache) dimension data in memory which is later used by our
> > Java custom made ETL tool can I do something like below ?
> >
> > can I cache a RDD in memory for a whole day ? as of I know RDD will get
> > empty once the spark code finish executing (correct me if I am wrong).
> >
> > Spark:-
> > create a RDD
> > rdd.persistance
> >
> > Thanks
> >
> >
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/can-I-use-Spark-as-alternative-for-gem-fire-cache-tp25106.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Ahhhh... Spark creates >30000 partitions... What can I do?

2015-10-20 Thread Deenar Toraskar
also check out wholeTextFiles

https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html#wholeTextFiles(java.lang.String,%20int)

On 20 October 2015 at 15:04, Lan Jiang  wrote:

> As Francois pointed out, you are encountering a classic small file
> anti-pattern. One solution I used in the past is to wrap all these small
> binary files into a sequence file or avro file. For example, the avro
> schema can have two fields: filename: string and binaryname:byte[]. Thus
> your file is splittable and will not create so many partitions.
>
> Lan
>
>
> On Oct 20, 2015, at 8:03 AM, François Pelletier <
> newslett...@francoispelletier.org> wrote:
>
> You should aggregate your files in larger chunks before doing anything
> else. HDFS is not fit for small files. It will bloat it and cause you a lot
> of performance issues. Target a few hundred MB chunks partition size and
> then save those files back to hdfs and then delete the original ones. You
> can read, use coalesce and the saveAsXXX on the result.
>
> I had the same kind of problem once and solved it in bunching 100's of
> files together in larger ones. I used text files with bzip2 compression.
>
>
>
> Le 2015-10-20 08:42, Sean Owen a écrit :
>
> coalesce without a shuffle? it shouldn't be an action. It just treats many
> partitions as one.
>
> On Tue, Oct 20, 2015 at 1:00 PM, t3l  wrote:
>
>>
>> I have dataset consisting of 5 binary files (each between 500kb and
>> 2MB). They are stored in HDFS on a Hadoop cluster. The datanodes of the
>> cluster are also the workers for Spark. I open the files as a RDD using
>> sc.binaryFiles("hdfs:///path_to_directory").When I run the first action
>> that
>> involves this RDD, Spark spawns a RDD with more than 3 partitions. And
>> this takes ages to process these partitions even if you simply run
>> "count".
>> Performing a "repartition" directly after loading does not help, because
>> Spark seems to insist on materializing the RDD created by binaryFiles
>> first.
>>
>> How I can get around this?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
>


Re: JdbcRDD Constructor

2015-10-20 Thread Deenar Toraskar
You have 2 options

a) don't use partitioning, if the table is small spark will only use one
task to load it

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

b) create a view that includes  hashcode column similar to
 ora_hash(r.rowid, 100) and use that for partitioning.

Deenar


On 20 October 2015 at 15:19, satish chandra j <jsatishchan...@gmail.com>
wrote:

> Hi Deenar,
> Thanks for your valuable inputs
>
> Here is a situation, if a Source Table does not have any such
> column(unique values,numeric and sequential) which is suitable as Partition
> Column to be specified for JDBCRDD Constructor or DataSource API.How to
> proceed further on this scenario and also let me know if any default
> approach Spark is going to implement if do not give any such inputs as
> "lowerbound" and "upperbound" to JDBCRDD Constructor or DataSourceAPI
>
> Thanks in advance for your inputs
>
> Regards,
> Satish Chandra J
>
>
> On Thu, Sep 24, 2015 at 10:18 PM, Deenar Toraskar <
> deenar.toras...@thinkreactive.co.uk> wrote:
>
>> you are interpreting the JDBCRDD API incorrectly. If you want to use
>> partitions, then the column used to partition and present in the where
>> clause must be numeric and the lower bound and upper bound must be the min
>> and max values of the column. Spark will equally distribute the range over
>> the number of partitions selected. So in your case OFFSET is the first
>> placeholder and LIMIT the second
>>
>> numPartitions 1 - Your query will be called once with first placeholder 0
>> and second placeholder 100, this explains how you get 100 rows
>> select *  from schema.Table OFFSET 0 LIMIT 100
>>
>> numPartitions 2 - Your query will be called twice with first placeholder
>> 0 and second placeholder 50, and second time with 51,100. Again this
>> explains why you get 150 records
>>
>> select *  from schema.Table OFFSET 0 LIMIT 50
>> select *  from schema.Table OFFSET 51 LIMIT 100
>>
>> numPartitions 3 - Your query will be called thrice
>>
>> select *  from schema.Table OFFSET 0 LIMIT 34
>> select *  from schema.Table OFFSET 35 LIMIT 67
>> select *  from schema.Table OFFSET 68 LIMIT 100
>>
>> That explains why you get 201 records. You need to amend the query and
>> provide correct lower and upper bounds aligned to the column used in the
>> where clause.
>>
>> See
>> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/
>>
>> Deenar
>>
>>
>>
>>
>> *Think Reactive Ltd*
>> deenar.toras...@thinkreactive.co.uk
>> 07714140812
>>
>>
>>
>> On 24 September 2015 at 11:55, satish chandra j <jsatishchan...@gmail.com
>> > wrote:
>>
>>> HI Deenar,
>>>
>>> Please find the SQL query below:
>>>
>>> var SQL_RDD= new JdbcRDD( sc, ()=>
>>> DriverManager.getConnection(url,user,pass),"select col1, col2,
>>> col3..col 37 from schema.Table LIMIT ? OFFSET ?",100,0,*1*,(r:
>>> ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37")))
>>>
>>>
>>> When I have the above 100,0,*1 * I am getting SQL_RDD.count as 100
>>> When set to 100,0,2 I am getting SQL_RDD.count as 151
>>> When set to 100,0,3 I am getting SQL RDD.count as 201
>>>
>>> But where as I expect every execution count should be 100, let me know
>>> if I am missing anything here
>>>
>>> Regards,
>>> Satish Chandra
>>>
>>>
>>> On Thu, Sep 24, 2015 at 12:48 AM, Deenar Toraskar <
>>> deenar.toras...@thinkreactive.co.uk> wrote:
>>>
>>>> Satish
>>>>
>>>> Can you post the SQL query you are using?
>>>>
>>>> The SQL query must have 2 placeholders and both of them should be an
>>>> inclusive range (<= and >=)..
>>>>
>>>> e.g. select title, author from books where ? <= id and id <= ?
>>>>
>>>> Are you doing this?
>>>>
>>>> Deenar
>>>>
>>>>
>>>>
>>>>
>>>> *Think Reactive Ltd*
>>>> deenar.toras...@thinkreactive.co.uk
>>>> 07714140812
>>>>
>>>>
>>>>
>>>> On 23 September 2015 at 13:47, satish chandra j <
>>>> jsatishchan...@gmail.com> wrote:
&

Re: Spark SQL Thriftserver and Hive UDF in Production

2015-10-19 Thread Deenar Toraskar
Reece

You can do the following. Start the spark-shell. Register the UDFs in the
shell using sqlContext, then start the Thrift Server using startWithContext
from the spark shell: https://github.com/apache/spark/blob/master/sql/hive-
thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver
/HiveThriftServer2.scala#L56



Regards
Deenar

On 19 October 2015 at 04:42, Mohammed Guller  wrote:

> Have you tried registering the function using the Beeline client?
>
> Another alternative would be to create a Spark SQL UDF and launch the
> Spark SQL Thrift server programmatically.
>
> Mohammed
>
> -Original Message-
> From: ReeceRobinson [mailto:re...@therobinsons.gen.nz]
> Sent: Sunday, October 18, 2015 8:05 PM
> To: user@spark.apache.org
> Subject: Spark SQL Thriftserver and Hive UDF in Production
>
> Does anyone have some advice on the best way to deploy a Hive UDF for use
> with a Spark SQL Thriftserver where the client is Tableau using Simba ODBC
> Spark SQL driver.
>
> I have seen the hive documentation that provides an example of creating
> the function using a hive client ie: CREATE FUNCTION myfunc AS 'myclass'
> USING JAR 'hdfs:///path/to/jar';
>
> However using Tableau I can't run this create function statement to
> register my UDF. Ideally there is a configuration setting that will load my
> UDF jar and register it at start-up of the thriftserver.
>
> Can anyone tell me what the best option if it is possible?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Thriftserver-and-Hive-UDF-in-Production-tp25114.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to have Single refernce of a class in Spark Streaming?

2015-10-17 Thread Deenar Toraskar
Swetha

Look at
http://spark.apache.org/docs/latest/programming-guide.html#shared-variables



Normally, when a function passed to a Spark operation (such as map or reduce)
is executed on a remote cluster node, it works on separate copies of all
the variables used in the function. These variables are copied to each
machine, and no updates to the variables on the remote machine are
propagated back to the driver program. Supporting general, read-write
shared variables across tasks would be inefficient. However, Spark does
provide two limited types of *shared variables* for two common usage
patterns: broadcast variables and accumulators.


Deenar

On 17 October 2015 at 02:05, swetha  wrote:

> Hi,
>
> How to have a single reference of a class across all the executors in Spark
> Streaming? The contents of the class will be updated at all the executors.
> Would using it as a variable inside updateStateByKey guarantee that
> reference is updated across all the  executors and no
> concurrentModificationException? Following is how I am trying to use a
> Tracker Class across all the JVMs.
>
> val trackerClass = new TrackerClass();
>
>
> val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
>
> def updateFunction(newValues: Seq[Int], runningCount: Option[Int]):
> Option[Int] = {
> getMergedSession(this.trackerClass)
> Some(newCount)
> }
>
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Single-refernce-of-a-class-in-Spark-Streaming-tp25103.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL running totals

2015-10-15 Thread Deenar Toraskar
you can do a self join of the table with itself with the join clause being
a.col1 >= b.col1

select a.col1, a.col2, sum(b.col2)
from tablea as a left outer join tablea as b on (a.col1 >= b.col1)
group by a.col1, a.col2

I havent tried it, but cant see why it cant work, but doing it in RDD might
be more efficient see
https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/

On 15 October 2015 at 18:48, Stefan Panayotov  wrote:

> Hi,
>
> I need help with Spark SQL. I need to achieve something like the following.
> If I have data like:
>
> col_1  col_2
> 1 10
> 2 30
> 3 15
> 4 20
> 5 25
>
> I need to get col_3 to be the running total of the sum of the previous
> rows of col_2, e.g.
>
> col_1  col_2  col_3
> 1 1010
> 2 3040
> 3 1555
> 4 2075
> 5 25100
>
> Is there a way to achieve this in Spark SQL or maybe with Data frame
> transformations?
>
> Thanks in advance,
>
>
> *Stefan Panayotov, PhD **Home*: 610-355-0919
> *Cell*: 610-517-5586
> *email*: spanayo...@msn.com
> spanayo...@outlook.com
> spanayo...@comcast.net
>
>


Re: Spark DataFrame GroupBy into List

2015-10-14 Thread Deenar Toraskar
collect_set and collect_list are built-in User Defined functions see
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

On 14 October 2015 at 03:45, SLiZn Liu  wrote:

> Hi Michael,
>
> Can you be more specific on `collect_set`? Is it a built-in function or,
> if it is an UDF, how it is defined?
>
> BR,
> Todd Leo
>
> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust 
> wrote:
>
>> import org.apache.spark.sql.functions._
>>
>> df.groupBy("category")
>>   .agg(callUDF("collect_set", df("id")).as("id_list"))
>>
>> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu 
>> wrote:
>>
>>> Hey Spark users,
>>>
>>> I'm trying to group by a dataframe, by appending occurrences into a list
>>> instead of count.
>>>
>>> Let's say we have a dataframe as shown below:
>>>
>>> | category | id |
>>> |  |:--:|
>>> | A| 1  |
>>> | A| 2  |
>>> | B| 3  |
>>> | B| 4  |
>>> | C| 5  |
>>>
>>> ideally, after some magic group by (reverse explode?):
>>>
>>> | category | id_list  |
>>> |  |  |
>>> | A| 1,2  |
>>> | B| 3,4  |
>>> | C| 5|
>>>
>>> any tricks to achieve that? Scala Spark API is preferred. =D
>>>
>>> BR,
>>> Todd Leo
>>>
>>>
>>>
>>>
>>


Re: OutOfMemoryError When Reading Many json Files

2015-10-14 Thread Deenar Toraskar
Hi

Why dont you check if you can just process the large file standalone and
then do the outer loop next.

sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn(
"new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) )
.combineByKey()

Deenar

On 14 October 2015 at 05:18, SLiZn Liu  wrote:

> Hey Spark Users,
>
> I kept getting java.lang.OutOfMemoryError: Java heap space as I read a
> massive amount of json files, iteratively via read.json(). Even the
> result RDD is rather small, I still get the OOM Error. The brief structure
> of my program reads as following, in psuedo-code:
>
> file_path_list.map{ jsonFile: String =>
>   sqlContext.read.json(jsonFile)
> .select($"some", $"fields")
> .withColumn("new_col", some_transformations($"col"))
> .rdd.map( x: Row => (k, v) )
> .combineByKey() // which groups a column into item lists by another 
> column as keys
> }.reduce( (i, j) => i.union(j) )
> .combineByKey() // which combines results from all json files
>
> I confess some of the json files are Gigabytes huge, yet the combined RDD
> is in a few Megabytes. I’m not familiar with the under-the-hood mechanism,
> but my intuitive understanding of how the code executes is, read the file
> once a time (where I can easily modify map to foreach when fetching from
> file_path_list, if that’s the case), do the inner transformation on DF
> and combine, then reduce and do the outer combine immediately, which
> doesn’t require to hold all RDDs generated from all files in the memory.
> Obviously, as my code raises OOM Error, I must have missed something
> important.
>
> From the debug log, I can tell the OOM Error happens when reading the same
> file, which is in a modest size of 2GB, while driver.memory is set to 13GB,
> and the available memory size before the code execution is around 8GB, on
> my standalone machine running as “local[8]”.
>
> To overcome this, I also tried to initialize an empty universal RDD
> variable, iteratively read one file at a time using foreach, then instead
> of reduce, simply combine each RDD generated by the json files, except the
> OOM Error remains.
>
> Other configurations:
>
>- set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used
>- set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
>
> Any suggestions other than scale up/out the spark cluster?
>
> BR,
> Todd Leo
> ​
>


Re: Running in cluster mode causes native library linking to fail

2015-10-13 Thread Deenar Toraskar
Hi Bernardo

Is the native library installed on all machines of your cluster and are you
setting both the spark.driver.extraLibraryPath and
spark.executor.extraLibraryPath ?

Deenar



On 14 October 2015 at 05:44, Bernardo Vecchia Stein <
bernardovst...@gmail.com> wrote:

> Hello,
>
> I am trying to run some scala code in cluster mode using spark-submit.
> This code uses addLibrary to link with a .so that exists in the machine,
> and this library has a function to be called natively (there's a native
> definition as needed in the code).
>
> The problem I'm facing is: whenever I try to run this code in cluster
> mode, spark fails with the following message when trying to execute the
> native function:
> java.lang.UnsatisfiedLinkError:
> org.name.othername.ClassName.nativeMethod([B[B)[B
>
> Apparently, the library is being found by spark, but the required function
> isn't found.
>
> When trying to run in client mode, however, this doesn't fail and
> everything works as expected.
>
> Does anybody have any idea of what might be the problem here? Is there any
> bug that could be related to this when running in cluster mode?
>
> I appreciate any help.
> Thanks,
> Bernardo
>


Re: Datastore or DB for spark

2015-10-10 Thread Deenar Toraskar
The choice of datastore is driven by your use case. In fact Spark can work
with multiple datastores too. Each datastore is optimised for certain kinds
of data.

e.g. HDFS is great for analytics and large data sets at rest. It is
scalable and very performant, but is immutable. No-SQL databases supports
key value and indexed lookups, provide granular update semantics and
provide eventual consistency. Relational databases provide more stronger
transactional guarantees.

So you can pick and choose and mix the storage layer appropriate to the
data in hand. e.g. logs might go to HDFS, product catalogues in Cassandra
and transactions in a relational database. Spark works transparently over
all these data sources.

Hope that helps.

On 9 October 2015 at 23:37, Xiao Li  wrote:

> FYI, in my local environment, Spark is connected to DB2 on z/OS but that
> requires a special JDBC driver.
>
> Xiao Li
>
>
> 2015-10-09 8:38 GMT-07:00 Rahul Jeevanandam :
>
>> Hi Jörn Franke
>>
>> I was sure that relational database wouldn't be a good option for Spark.
>> But what about distributed databases like Hbase, Cassandra, etc?
>>
>> On Fri, Oct 9, 2015 at 7:21 PM, Jörn Franke  wrote:
>>
>>> I am not aware of any empirical evidence, but I think hadoop (HDFS) as a
>>> datastore for Spark is quiet common. With relational databases you usually
>>> do not have so much data and you do not benefit from data locality.
>>>
>>> Le ven. 9 oct. 2015 à 15:16, Rahul Jeevanandam  a
>>> écrit :
>>>
 I wanna know what everyone are using. Which datastore is popular among
 Spark community.

 On Fri, Oct 9, 2015 at 6:16 PM, Ted Yu  wrote:

> There are connectors for hbase, Cassandra, etc.
>
> Which data store do you use now ?
>
> Cheers
>
> On Oct 9, 2015, at 3:10 AM, Rahul Jeevanandam 
> wrote:
>
> Hi Guys,
>
>  I wanted to know what is the databases that you associate with spark?
>
> --
> Regards,
>
> *Rahul J*
>
>


 --
 Regards,

 *Rahul J*

>>>
>>
>>
>> --
>> Regards,
>> *Rahul J*
>> Associate Architect – Technology
>> Incture 
>>
>
>


Re: HDFS small file generation problem

2015-09-27 Thread Deenar Toraskar
You could try a couple of things

a) use Kafka for stream processing, store current incoming events and spark
streaming job ouput in Kafka rather than on HDFS and dual write to HDFS too
(in a micro batched mode), so every x minutes. Kafka is more suited to
processing lots of small events/
b) Coalesce small files on HDFS into a big hourly, daily file. Use HDFS
partitioning to ensure that your pig job reads the least amount of
partitions.

Deenar

On 27 September 2015 at 14:47, ayan guha  wrote:

> I would suggest not to write small files to hdfs. rather you can hold them
> in memory, maybe off heap. and then you may flush it to hdfs using another
> job. similar to https://github.com/ptgoetz/storm-hdfs (not sure if spark
> already has something like it)
>
> On Sun, Sep 27, 2015 at 11:36 PM,  wrote:
>
>> Hello,
>> I'm still investigating my small file generation problem generated by my
>> Spark Streaming jobs.
>> Indeed, my Spark Streaming jobs are receiving a lot of small events (avg
>> 10kb), and I have to store them inside HDFS in order to treat them by PIG
>> jobs on-demand.
>> The problem is the fact that I generate a lot of small files in HDFS
>> (several millions) and it can be problematic.
>> I investigated to use Hbase or Archive file but I don't want to do it
>> finally.
>> So, what about this solution :
>> - Spark streaming generate on the fly several millions of small files in
>> HDFS
>> - Each night I merge them inside a big daily file
>> - I launch my PIG jobs on this big file ?
>>
>> Other question I have :
>> - Is it possible to append a big file (daily) by adding on the fly my
>> event ?
>>
>> Tks a lot
>> Nicolas
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: JdbcRDD Constructor

2015-09-24 Thread Deenar Toraskar
On 24 September 2015 at 17:48, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> you are interpreting the JDBCRDD API incorrectly. If you want to use
> partitions, then the column used to partition and present in the where
> clause must be numeric and the lower bound and upper bound must be the min
> and max values of the column. Spark will equally distribute the range over
> the number of partitions selected. So in your case OFFSET is the first
> placeholder and LIMIT the second
>
> numPartitions 1 - Your query will be called once with first placeholder 0
> and second placeholder 100, this explains how you get 100 rows
> select *  from schema.Table OFFSET 0 LIMIT 100
>
> numPartitions 2 - Your query will be called twice with first placeholder
> 0 and second placeholder 50, and second time with 51,100. Again this
> explains why you get 150 records
>
> select *  from schema.Table OFFSET 0 LIMIT 50
> select *  from schema.Table OFFSET 51 LIMIT 100
>
> numPartitions 3 - Your query will be called thrice
>
> select *  from schema.Table OFFSET 0 LIMIT 34
> select *  from schema.Table OFFSET 35 LIMIT 67
> select *  from schema.Table OFFSET 68 LIMIT 100
>
> That explains why you get 201 records. You need to amend the query and
> provide correct lower and upper bounds aligned to the column used in the
> where clause.
>
> See
> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/
>
> Deenar
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 24 September 2015 at 11:55, satish chandra j <jsatishchan...@gmail.com>
> wrote:
>
>> HI Deenar,
>>
>> Please find the SQL query below:
>>
>> var SQL_RDD= new JdbcRDD( sc, ()=>
>> DriverManager.getConnection(url,user,pass),"select col1, col2,
>> col3..col 37 from schema.Table LIMIT ? OFFSET ?",100,0,*1*,(r:
>> ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37")))
>>
>>
>> When I have the above 100,0,*1 * I am getting SQL_RDD.count as 100
>> When set to 100,0,2 I am getting SQL_RDD.count as 151
>> When set to 100,0,3 I am getting SQL RDD.count as 201
>>
>> But where as I expect every execution count should be 100, let me know if
>> I am missing anything here
>>
>> Regards,
>> Satish Chandra
>>
>>
>> On Thu, Sep 24, 2015 at 12:48 AM, Deenar Toraskar <
>> deenar.toras...@thinkreactive.co.uk> wrote:
>>
>>> Satish
>>>
>>> Can you post the SQL query you are using?
>>>
>>> The SQL query must have 2 placeholders and both of them should be an
>>> inclusive range (<= and >=)..
>>>
>>> e.g. select title, author from books where ? <= id and id <= ?
>>>
>>> Are you doing this?
>>>
>>> Deenar
>>>
>>>
>>>
>>>
>>> *Think Reactive Ltd*
>>> deenar.toras...@thinkreactive.co.uk
>>> 07714140812
>>>
>>>
>>>
>>> On 23 September 2015 at 13:47, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
>>>> HI,
>>>> Could anybody provide inputs if they have came across similar issue
>>>>
>>>> @Rishitesh
>>>> Could you provide if any sample code to use JdbcRDDSuite
>>>>
>>>>
>>>> Regards,
>>>> Satish Chandra
>>>>
>>>> On Wed, Sep 23, 2015 at 5:14 PM, Rishitesh Mishra <
>>>> rishi80.mis...@gmail.com> wrote:
>>>>
>>>>> I am using Spark 1.5. I always get count = 100, irrespective of num
>>>>> partitions.
>>>>>
>>>>> On Wed, Sep 23, 2015 at 5:00 PM, satish chandra j <
>>>>> jsatishchan...@gmail.com> wrote:
>>>>>
>>>>>> HI,
>>>>>> Currently using Spark 1.2.2, could you please let me know correct
>>>>>> results output count which you got it by using JdbcRDDSuite
>>>>>>
>>>>>> Regards,
>>>>>> Satish Chandra
>>>>>>
>>>>>> On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra <
>>>>>> rishi80.mis...@gmail.com> wrote:
>>>>>>
>>>>>>> Which version of Spark you are using ??  I can get correct results
>>>>>>> using JdbcRDD. Infact there is a test suite precisely for this (
>>>>>>> JdbcRDDSuite) .
>>>>

Re: How to turn on basic authentication for the Spark Web

2015-09-23 Thread Deenar Toraskar
Rafal
Check this out https://spark.apache.org/docs/latest/security.html

Regards
Deenar

On 23 September 2015 at 19:13, Rafal Grzymkowski  wrote:

> Hi,
>
> I want to enable basic Http authentication for the spark web UI (without
> recompilation need for Spark).
> I see there is 'spark.ui.filters' option but don't know how to use it.
> I found possibility to use kerberos param but it's not an option for me.
> What should I set there to use secret token based auth or user/passwd?
>
> Any hints?
>
> /MyCo
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: JdbcRDD Constructor

2015-09-23 Thread Deenar Toraskar
Satish

Can you post the SQL query you are using?

The SQL query must have 2 placeholders and both of them should be an
inclusive range (<= and >=)..

e.g. select title, author from books where ? <= id and id <= ?

Are you doing this?

Deenar

On 23 September 2015 at 20:18, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Satish
>
> Can you post the SQL query you are using?
>
> The SQL query must have 2 placeholders and both of them should be an
> inclusive range (<= and >=)..
>
> e.g. select title, author from books where ? <= id and id <= ?
>
> Are you doing this?
>
> Deenar
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 23 September 2015 at 13:47, satish chandra j <jsatishchan...@gmail.com>
> wrote:
>
>> HI,
>> Could anybody provide inputs if they have came across similar issue
>>
>> @Rishitesh
>> Could you provide if any sample code to use JdbcRDDSuite
>>
>>
>> Regards,
>> Satish Chandra
>>
>> On Wed, Sep 23, 2015 at 5:14 PM, Rishitesh Mishra <
>> rishi80.mis...@gmail.com> wrote:
>>
>>> I am using Spark 1.5. I always get count = 100, irrespective of num
>>> partitions.
>>>
>>> On Wed, Sep 23, 2015 at 5:00 PM, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
>>>> HI,
>>>> Currently using Spark 1.2.2, could you please let me know correct
>>>> results output count which you got it by using JdbcRDDSuite
>>>>
>>>> Regards,
>>>> Satish Chandra
>>>>
>>>> On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra <
>>>> rishi80.mis...@gmail.com> wrote:
>>>>
>>>>> Which version of Spark you are using ??  I can get correct results
>>>>> using JdbcRDD. Infact there is a test suite precisely for this (
>>>>> JdbcRDDSuite) .
>>>>> I changed according to your input and got correct results from this
>>>>> test suite.
>>>>>
>>>>> On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j <
>>>>> jsatishchan...@gmail.com> wrote:
>>>>>
>>>>>> HI All,
>>>>>>
>>>>>> JdbcRDD constructor has following parameters,
>>>>>>
>>>>>> *JdbcRDD
>>>>>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/rdd/JdbcRDD.html#JdbcRDD(org.apache.spark.SparkContext,%20scala.Function0,%20java.lang.String,%20long,%20long,%20int,%20scala.Function1,%20scala.reflect.ClassTag)>*
>>>>>> (SparkContext
>>>>>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/SparkContext.html>
>>>>>>  sc,
>>>>>> scala.Function0 getConnection, String sql, *long 
>>>>>> lowerBound,
>>>>>> long upperBound, int numPartitions*,
>>>>>> scala.Function1<java.sql.ResultSet,T
>>>>>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/rdd/JdbcRDD.html>>
>>>>>>  mapRow,
>>>>>> scala.reflect.ClassTag>>>>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/rdd/JdbcRDD.html>
>>>>>> > evidence$1)
>>>>>>
>>>>>> where the below parameters *lowerBound* refers to Lower boundary of
>>>>>> entire data, *upperBound *refers to Upper boundary of entire data
>>>>>> and *numPartitions *refer to Number of partitions
>>>>>>
>>>>>> Source table to which JbdcRDD is fetching data from Oracle DB has
>>>>>> more than 500 records but its confusing when I tried several executions 
>>>>>> by
>>>>>> changing "numPartitions" parameter
>>>>>>
>>>>>> LowerBound,UpperBound,numPartitions: Output Count
>>>>>>
>>>>>> 0 ,100  ,1   : 100
>>>>>>
>>>>>> 0 ,100  ,2   : 151
>>>>>>
>>>>>> 0 ,100  ,3   : 201
>>>>>>
>>>>>>
>>>>>> Please help me in understanding the why Output count is 151 if
>>>>>> numPartitions is 2 and Output count is 201 if numPartitions is 3
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Satish Chandra
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to turn on basic authentication for the Spark Web

2015-09-23 Thread Deenar Toraskar
Check this out
http://lambda.fortytools.com/post/26977061125/servlet-filter-for-http-basic-auth
or https://gist.github.com/neolitec/8953607 for examples of filters
implementing basic authentication. Implement one of these and set them in
the spark.ui.filters property.

Deenar

On 23 September 2015 at 20:44, Rafal Grzymkowski  wrote:

> I know this Spark Security page, but the information there is not
> sufficient.
> Anyone make it works? Those basic servlets for ui.filters


Spark 1.5 UDAF ArrayType

2015-09-22 Thread Deenar Toraskar
Hi

I am trying to write an UDAF ArraySum, that does element wise sum of arrays
of Doubles returning an array of Double following the sample in
https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html.
I am getting the following error. Any guidance on handle complex type in
Spark SQL would be appreciated.

Regards
Deenar

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

class ArraySum extends UserDefinedAggregateFunction {
   def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)

  def bufferSchema: StructType =
StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)

  def dataType: DataType = ArrayType(DoubleType, false)

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Nil
  }

  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
val currentSum : Seq[Double] = buffer.getSeq(0)
val currentRow : Seq[Double] = input.getSeq(0)
buffer(0) = (currentSum, currentRow) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a + b
}
  // TODO handle different sizes arrays here
}
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val currentSum : Seq[Double] = buffer1.getSeq(0)
val currentRow : Seq[Double] = buffer2.getSeq(0)
buffer1(0) = (currentSum, currentRow) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a + b
}
  // TODO handle different sizes arrays here
}
  }

  def evaluate(buffer: Row): Any = {
buffer.getSeq(0)
  }
}

val arraySum = new ArraySum
sqlContext.udf.register("ArraySum", arraySum)

*%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
'2015-05-22' limit 10*

gives me the following error


Error in SQL statement: SparkException: Job aborted due to stage failure:
Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 219.0 (TID 11242, 10.172.255.236): java.lang.ClassCastException:
scala.collection.mutable.WrappedArray$ofRef cannot be cast to
org.apache.spark.sql.types.ArrayData at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
at
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
at
org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
Source) at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:362)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:141)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at
scala.collection.Iterator$class.foreach(Iterator.scala:727) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at
scala.collection.AbstractIterator.to(Iterator.scala:1157) at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at
org.apache.spark.scheduler.Task.run(Task.scala:88) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at

Re: How to share memory in a broadcast between tasks in the same executor?

2015-09-22 Thread Deenar Toraskar
Clement

In local mode all worker threads run in the driver VM. Your dictionary
should not be copied 32 times, in fact it wont be broadcast at all. Have
you tried increasing spark.driver.memory to ensure that the driver uses all
the memory on the machine.

Deenar

On 22 September 2015 at 19:42, Clément Frison 
wrote:

> Hello,
>
> My team and I have a 32-core machine and we would like to use a huge
> object - for example a large dictionary - in a map transformation and use
> all our cores in parallel by sharing this object among some tasks.
>
> We broadcast our large dictionary.
>
> dico_br = sc.broadcast(dico)
>
> We use it in a map:
>
> rdd.map(lambda x: (x[0], function(x[1], dico_br)))
>
> where function does a lookup : dico_br.value[x]
>
> Our issue is that our dictionary is loaded 32 times in memory, and it
> doesn't fit. So what we are doing is limiting the number of executors. It
> works fine but we only have 8 cpus working in parallel instead of 32.
>
> We would like to take advantage of multicore processing and shared memory,
> as the 32 cores are in the same machine. For example we would like to load
> the dictionary in memory 8 times only and make 4 cores share it. How could
> we achieve that with Spark ?
>
>
> What we have tried - without success :
>
> 1) One driver/worker with 32 cores : local[32]
>
> 2) Standalone with one master and 8 workers - each of them having 4 cores
>
> Thanks a lot for your help, Clement
>


Re: spark-avro takes a lot time to load thousands of files

2015-09-22 Thread Deenar Toraskar
Daniel

Can you elaborate why are you using a broadcast variable to concatenate
many Avro files into a single ORC file. Look at wholetextfiles on Spark
context.

SparkContext.wholeTextFiles lets you read a directory containing multiple
small text files, and returns each of them as (filename, content) pairs.
This is in contrast with textFile, which would return one record per line
in each file.
​
You can then process this RDD in parallel over the cluster, convert to a
dataframe and save as a ORC file.

Regards
Deenar


Re: Compute Median in Spark Dataframe

2015-06-22 Thread Deenar Toraskar
Many thanks, will look into this. I dont want to particularly reuse the
custom Hive UDAF I have, would prefer writing a new one it that is
cleaner.  I am just using the JVM.

On 5 June 2015 at 00:03, Holden Karau hol...@pigscanfly.ca wrote:

 My current example doesn't use a Hive UDAF, but you would  do something
 pretty similar (it calls a new user defined UDAF, and there are wrappers to
 make Spark SQL UDAFs from Hive UDAFs but they are private). So this is
 doable, but since it pokes at internals it will likely break between
 versions of Spark. If you want to see the WIP PR I have with Sparkling
 Pandas its at
 https://github.com/sparklingpandas/sparklingpandas/pull/90/files . If
 your doing this in JVM and just want to know how to wrap the Hive UDAF, you
 can grep/look in sql/hive/ in Spark, but I'd encourage you to see if there
 is another way to accomplish what you want (since poking at the internals
 is kind of dangerous).

 On Thu, Jun 4, 2015 at 6:28 AM, Deenar Toraskar deenar.toras...@gmail.com
  wrote:

 Hi Holden, Olivier


 So for column you need to pass in a Java function, I have some sample
 code which does this but it does terrible things to access Spark internals.
 I also need to call a Hive UDAF in a dataframe agg function. Are there
 any examples of what Column expects?

 Deenar

 On 2 June 2015 at 21:13, Holden Karau hol...@pigscanfly.ca wrote:

 So for column you need to pass in a Java function, I have some sample
 code which does this but it does terrible things to access Spark internals.


 On Tuesday, June 2, 2015, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Nice to hear from you Holden ! I ended up trying exactly that (Column)
 - but I may have done it wrong :

 In [*5*]: g.agg(Column(percentile(value, 0.5)))
 Py4JError: An error occurred while calling o97.agg. Trace:
 py4j.Py4JException: Method agg([class java.lang.String, class
 scala.collection.immutable.Nil$]) does not exist
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

 Any idea ?

 Olivier.
 Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a
 écrit :

 Not super easily, the GroupedData class uses a strToExpr function
 which has a pretty limited set of functions so we cant pass in the name of
 an arbitrary hive UDAF (unless I'm missing something). We can instead
 construct an column with the expression you want and then pass it in to
 agg() that way (although then you need to call the hive UDAF there). There
 are some private classes in hiveUdfs.scala which expose hiveUdaf's as 
 Spark
 SQL AggregateExpressions, but they are private.

 On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 I've finally come to the same conclusion, but isn't there any way to
 call this Hive UDAFs from the agg(percentile(key,0.5)) ??

 Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com
 a écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's
 Dataframe. I know you can use stats in a RDD but I'd rather stay 
 within a
 dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau





 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



Re: Anybody using Spark SQL JDBC server with DSE Cassandra?

2015-06-04 Thread Deenar Toraskar
Mohammed

Have you tried registering your Cassandra tables in Hive/Spark SQL using
the data frames API. These should be then available to query via the Spark
SQL/Thrift JDBC Server.

Deenar

On 1 June 2015 at 19:33, Mohammed Guller moham...@glassbeam.com wrote:

  Nobody using Spark SQL JDBC/Thrift server with DSE Cassandra?



 Mohammed



 *From:* Mohammed Guller [mailto:moham...@glassbeam.com]
 *Sent:* Friday, May 29, 2015 11:49 AM
 *To:* user@spark.apache.org
 *Subject:* Anybody using Spark SQL JDBC server with DSE Cassandra?



 Hi –



 We have successfully integrated Spark SQL with Cassandra. We have a
 backend that provides a REST API that allows users to execute SQL queries
 on data in C*. Now we would like to also support JDBC/ODBC connectivity ,
 so that user can use tools like Tableau to query data in C* through the
 Spark SQL JDBC server.



 However, I have been unable to find a driver that would allow the Spark
 SQL Thrift/JDBC server to connect with Cassandra. DataStax provides a
 closed-source driver that comes only with the DSE version of Cassandra.



 I would like to find out how many people are using the Spark SQL JDBC
 server + DSE Cassandra combination. If you do use Spark SQL JDBC server +
 DSE, I would appreciate if you could share your experience. For example,
 what kind of issues you have run into? How is the performance? What
 reporting tools you are using?



 Thank  you.



 Mohammed





Re: Transactional guarantee while saving DataFrame into a DB

2015-06-04 Thread Deenar Toraskar
Hi Tariq

You need to handle the transaction semantics yourself. You could for
example save from the dataframe to a staging table and then write to the
final table using a single atomic INSERT INTO finalTable from
stagingTable call. Remember to clear the staging table first to recover
from previous failures if any.

Deenar

On 2 June 2015 at 16:01, Mohammad Tariq donta...@gmail.com wrote:

 Hi list,

 With the help of Spark DataFrame API we can save a DataFrame into a
 database table through insertIntoJDBC() call. However, I could not find any
 info about how it handles the transactional guarantee. What if my program
 gets killed during the processing? Would it end up in partial load?

 Is it somehow possible to handle these kind of scenarios? Rollback or
 something of that sort?

 Many thanks.

 P.S : I am using spark-1.3.1-bin-hadoop2.4 with java 1.7

 [image: http://]
 Tariq, Mohammad
 about.me/mti
 [image: http://]
 http://about.me/mti




Re: Adding an indexed column

2015-06-04 Thread Deenar Toraskar
or you could

1) convert dataframe to RDD
2) use mapPartitions and zipWithIndex within each partition
3) convert RDD back to dataframe you will need to make sure you preserve
partitioning

Deenar

On 1 June 2015 at 02:23, ayan guha guha.a...@gmail.com wrote:

 If you are on spark 1.3, use repartitionandSort followed by mappartition.
 In 1.4, window functions will be supported, it seems
 On 1 Jun 2015 04:10, Ricardo Almeida ricardo.alme...@actnowib.com
 wrote:

 That's great and how would you create an ordered index by partition (by
 product in this example)?

 Assuming now a dataframe like:

 flag | product | price
 --
 1|   a |47.808764653746
 1|   b |47.808764653746
 1|   a |31.9869279512204
 1|   b |47.7907893713564
 1|   a |16.7599200038239
 1|   b |16.7599200038239
 1|   b |20.3916014172137


 get a new dataframe such as:

 flag | product | price | index
 --
 1|   a |47.808764653746  | 0
 1|   a |31.9869279512204 | 1
 1|   a |16.7599200038239 | 2
 1|   b |47.808764653746  | 0
 1|   b |47.7907893713564 | 1
 1|   b |20.3916014172137 | 2
 1|   b |16.7599200038239 | 3








 On 29 May 2015 at 12:25, Wesley Miao wesley.mi...@gmail.com wrote:

 One way I can see is to -

 1. get rdd from your df
 2. call rdd.zipWithIndex to get a new rdd
 3. turn your new rdd to a new df

 On Fri, May 29, 2015 at 5:43 AM, Cesar Flores ces...@gmail.com wrote:


 Assuming that I have the next data frame:

 flag | price
 --
 1|47.808764653746
 1|47.808764653746
 1|31.9869279512204
 1|47.7907893713564
 1|16.7599200038239
 1|16.7599200038239
 1|20.3916014172137

 How can I create a data frame with an extra indexed column as the next
 one:

 flag | price  | index
 --|---
 1|47.808764653746 | 0
 1|47.808764653746 | 1
 1|31.9869279512204| 2
 1|47.7907893713564| 3
 1|16.7599200038239| 4
 1|16.7599200038239| 5
 1|20.3916014172137| 6

 --
 Cesar Flores






Re: Compute Median in Spark Dataframe

2015-06-04 Thread Deenar Toraskar
Hi Holden, Olivier


So for column you need to pass in a Java function, I have some sample
code which does this but it does terrible things to access Spark internals.
I also need to call a Hive UDAF in a dataframe agg function. Are there any
examples of what Column expects?

Deenar

On 2 June 2015 at 21:13, Holden Karau hol...@pigscanfly.ca wrote:

 So for column you need to pass in a Java function, I have some sample code
 which does this but it does terrible things to access Spark internals.


 On Tuesday, June 2, 2015, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Nice to hear from you Holden ! I ended up trying exactly that (Column) -
 but I may have done it wrong :

 In [*5*]: g.agg(Column(percentile(value, 0.5)))
 Py4JError: An error occurred while calling o97.agg. Trace:
 py4j.Py4JException: Method agg([class java.lang.String, class
 scala.collection.immutable.Nil$]) does not exist
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

 Any idea ?

 Olivier.
 Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a
 écrit :

 Not super easily, the GroupedData class uses a strToExpr function which
 has a pretty limited set of functions so we cant pass in the name of an
 arbitrary hive UDAF (unless I'm missing something). We can instead
 construct an column with the expression you want and then pass it in to
 agg() that way (although then you need to call the hive UDAF there). There
 are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark
 SQL AggregateExpressions, but they are private.

 On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 I've finally come to the same conclusion, but isn't there any way to
 call this Hive UDAFs from the agg(percentile(key,0.5)) ??

 Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a
 écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's
 Dataframe. I know you can use stats in a RDD but I'd rather stay within a
 dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau




Re: converting DStream[String] into RDD[String] in spark streaming [I]

2015-03-29 Thread Deenar Toraskar
Sean

Thank you very much for your response. I have a requirement run a function
only over the new inputs in a Spark Streaming sliding window, i.e. the
latest batch of events only, do I just get a new Dstream using the slide
duration equal to the window duration ? such as


val sparkConf = new SparkConf().setAppName(TwitterRawJSON)
val ssc = new StreamingContext(sparkConf, Seconds(30))
// write all new tweets in the last 10mins
stream.window(Seconds(600), Seconds(600),
saveAsTextFiles(hdfs://localhost:9000/twitterRawJSON)


Alternatively I could find the time of the new batch, i could do something
like this

  def saveAsTextFiles(prefix: String, suffix: String = ) {
val saveFunc = (rdd: RDD[T], time: Time) = {
   if (time == currentBatchTime) {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
   }
}
this.foreachRDD(saveFunc)
  }

Regards
Deenar

P.S. The mail archive on nabble does not seem to show all responses.
-Original Message-
From: Sean Owen [mailto:so...@cloudera.com]
Sent: 22 March 2015 11:49
To: Deenar Toraskar
Cc: user@spark.apache.org
Subject: Re: converting DStream[String] into RDD[String] in spark streaming

On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar deenar.toras...@db.com
wrote:
 1) if there are no sliding window calls in this streaming context,
 will there just one file written per interval?

As many files as there are partitions will be written in each interval.

 2) if there is a sliding window call in the same context, such as

 val hashTags = stream.flatMap(json =
 DataObjectFactory.createStatus(json).getText.split(
 ).filter(_.startsWith(#)))

 val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
 Seconds(600))
  .map{case (topic, count) = (count, topic)}
  .transform(_.sortByKey(false))

 will the some files get written multiples time (as long as the
 interval is in the batch)

I don't think it's right to say files will be written many times, but yes
it is my understanding that data will be written many times since a datum
lies in many windows.


---
This e-mail may contain confidential and/or privileged information. If you
are not the intended recipient (or have received this e-mail in error)
please notify the sender immediately and delete this e-mail. Any
unauthorized copying, disclosure or distribution of the material in this
e-mail is strictly forbidden.

Please refer to http://www.db.com/en/content/eu_disclosures.htm for
additional EU corporate and regulatory disclosures and to
http://www.db.com/unitedkingdom/content/privacy.htm for information about
privacy.