Unable run Spark in YARN mode

2016-04-08 Thread maheshmath
I have set SPARK_LOCAL_IP=127.0.0.1 still getting below error

16/04/09 10:36:50 INFO spark.SecurityManager: Changing view acls to: mahesh
16/04/09 10:36:50 INFO spark.SecurityManager: Changing modify acls to:
mahesh
16/04/09 10:36:50 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(mahesh); users with modify permissions: Set(mahesh)
16/04/09 10:36:51 INFO util.Utils: Successfully started service
'sparkDriver' on port 43948.
16/04/09 10:36:51 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/04/09 10:36:51 INFO Remoting: Starting remoting
16/04/09 10:36:52 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@127.0.0.1:32792]
16/04/09 10:36:52 INFO util.Utils: Successfully started service
'sparkDriverActorSystem' on port 32792.
16/04/09 10:36:52 INFO spark.SparkEnv: Registering MapOutputTracker
16/04/09 10:36:52 INFO spark.SparkEnv: Registering BlockManagerMaster
16/04/09 10:36:52 INFO storage.DiskBlockManager: Created local directory at
/tmp/blockmgr-a2079037-6bbe-49ce-ba78-d475e38ad362
16/04/09 10:36:52 INFO storage.MemoryStore: MemoryStore started with
capacity 517.4 MB
16/04/09 10:36:52 INFO spark.SparkEnv: Registering OutputCommitCoordinator
16/04/09 10:36:53 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/04/09 10:36:53 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
16/04/09 10:36:53 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.
16/04/09 10:36:53 INFO ui.SparkUI: Started SparkUI at http://127.0.0.1:4040
16/04/09 10:36:53 INFO client.RMProxy: Connecting to ResourceManager at
/0.0.0.0:8032
16/04/09 10:36:54 INFO yarn.Client: Requesting a new application from
cluster with 1 NodeManagers
16/04/09 10:36:54 INFO yarn.Client: Verifying our application has not
requested more than the maximum memory capability of the cluster (8192 MB
per container)
16/04/09 10:36:54 INFO yarn.Client: Will allocate AM container, with 896 MB
memory including 384 MB overhead
16/04/09 10:36:54 INFO yarn.Client: Setting up container launch context for
our AM
16/04/09 10:36:54 INFO yarn.Client: Setting up the launch environment for
our AM container
16/04/09 10:36:54 INFO yarn.Client: Preparing resources for our AM container
16/04/09 10:36:56 INFO yarn.Client: Uploading resource
file:/home/mahesh/Programs/spark-1.6.1-bin-hadoop2.6/lib/spark-assembly-1.6.1-hadoop2.6.0.jar
->
hdfs://localhost:54310/user/mahesh/.sparkStaging/application_1460137661144_0003/spark-assembly-1.6.1-hadoop2.6.0.jar
16/04/09 10:36:59 INFO yarn.Client: Uploading resource
file:/tmp/spark-f28e3fd5-4dcd-4199-b298-c7fc607dedb4/__spark_conf__5551799952710555772.zip
->
hdfs://localhost:54310/user/mahesh/.sparkStaging/application_1460137661144_0003/__spark_conf__5551799952710555772.zip
16/04/09 10:36:59 INFO spark.SecurityManager: Changing view acls to: mahesh
16/04/09 10:36:59 INFO spark.SecurityManager: Changing modify acls to:
mahesh
16/04/09 10:36:59 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(mahesh); users with modify permissions: Set(mahesh)
16/04/09 10:36:59 INFO yarn.Client: Submitting application 3 to
ResourceManager
16/04/09 10:36:59 INFO impl.YarnClientImpl: Submitted application
application_1460137661144_0003
16/04/09 10:37:00 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:00 INFO yarn.Client: 
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1460178419692
 final status: UNDEFINED
 tracking URL: http://gubbi:8088/proxy/application_1460137661144_0003/
 user: mahesh
16/04/09 10:37:01 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:02 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:03 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:04 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:05 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster registered as NettyRpcEndpointRef(null)
16/04/09 10:37:05 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter.
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS
-> gubbi, PROXY_URI_BASES ->
http://gubbi:8088/proxy/application_1460137661144_0003),
/proxy/application_1460137661144_0003
16/04/09 10:37:05 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
16/04/09 10:37:05 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:06 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: RUNNING)
16/04/09 

How Spark handles dead machines during a job.

2016-04-08 Thread Sung Hwan Chung
Hello,

Say, that I'm doing a simple rdd.map followed by collect. Say, also, that
one of the executors finish all of its tasks, but there are still other
executors running.

If the machine that hosted the finished executor gets terminated, does the
master still have the results from the finished tasks (and thus doesn't
restart those finished tasks)?

Or does the master require that all the executors be alive during the
entire map-collect cycle?

Thanks!


Re: Monitoring S3 Bucket with Spark Streaming

2016-04-08 Thread Natu Lauchande
Hi Benjamin,

I have done it . The critical configuration items are the ones below :

  ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem")
  ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",
AccessKeyId)
  ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",
AWSSecretAccessKey)

  val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder")

This code will probe for new S3 files created in your every batch interval.

Thanks,
Natu

On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim  wrote:

> Has anyone monitored an S3 bucket or directory using Spark Streaming and
> pulled any new files to process? If so, can you provide basic Scala coding
> help on this?
>
> Thanks,
> Ben
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Unsubscribe

2016-04-08 Thread Db-Blog

Unsubscribe


> On 06-Apr-2016, at 5:40 PM, Brian London  wrote:
> 
> unsubscribe

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Work on Spark engine for Hive

2016-04-08 Thread Szehon Ho
So I only know that latest CDH released version does have Hive (based on
1.2) on Spark 1.6 , though admittedly have not tested Hive 2.0 branch on
that.  So I would recommend for you try the latest 1.6-based Spark assembly
from CDH (the version that we test) to rule out possibility of building it
differently.

If there are still issues, it seems the dependency-conflict issue would be
because of the Hive-2.x branch.  That's my hunch, as its plausible that a
lot more libraries have been added by community to Hive 2.x branch as
opposed to Hive 1.x.  Let us know your findings after trying that one, and
we could look further.

Thanks,
Szehon

On Fri, Apr 8, 2016 at 1:03 PM, Mich Talebzadeh 
wrote:

> The fundamental problem seems to be the spark-assembly-n.n.n-hadoopn.n..jar
> libraries that are incompatible and cause issues. For example Hive does
> not work with existing Spark 1.6.1 binaries, In other words if you set
> hive.execution.engine in the following $HIVE_HOME/cong/hive-site.xml
>
> hive.execution.engine
>
> *spark*
>   Expects one of [mr, tez, spark].
>   Chooses execution engine. Options are: mr (Map reduce, default),
> tez, spark. While MR
>   remains the default engine for historical reasons, it is itself a
> historical engine
>   and is deprecated in Hive 2 line. It may be removed without further
> warning.
> 
>
> It will crash.
>
> In short it only currently works for me Spark 1.3.1 binaries together with
> putting the spark assembly jar file spark-assembly-1.3.1-hadoop2.4.0.jar (to
> be extracted via Spark 1.3.1 source build) and put in $HIVE_HOME/lib and
> installing Spark 1.3.1 binaries.
>
> Afterwards whenever you invoke Hive you will need to initialise it using
> the following:
>
> set spark.home=/usr/lib/spark-1.3.1-bin-hadoop2.6;
> set hive.execution.engine=spark;
> set spark.master=yarn-client;
> This is just a work-around which is not what you want.
>
> HTH
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 8 April 2016 at 19:16, Szehon Ho  wrote:
>
>> Yes, that is a good goal we will have to do eventually.  I was not aware
>> that it is not working to be honest.
>>
>> Can you let us know what is broken on Hive 2 on Spark 1.6.1?  Preferably
>> via filing a JIRA on HIVE side?
>>
>> On Fri, Apr 8, 2016 at 7:47 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> This is a different thing. the question is when will Hive 2 be able to
>>> run on Spark 1.6.1 installed binaries as execution engine.
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 8 April 2016 at 11:30, 469564481 <469564...@qq.com> wrote:
>>>
   I do not install spark engines.
   I can use jdbc  connectting to hive and execute
 sql(create,drop...),but odbc testcase(HiveclientTest) can connect to hive,
 can not execute sql.


 -- 原始邮件 --
 *发件人:* "Mich Talebzadeh";;
 *发送时间:* 2016年4月8日(星期五) 下午5:02
 *收件人:* "user"; "user @spark"<
 user@spark.apache.org>;
 *主题:* Work on Spark engine for Hive

 Hi,

 Is there any scheduled work to enable Hive to use recent version of
 Spark engines?

 This is becoming an issue as some applications have to rely on MapR
 engine to do operations on Hive 2 which is serial and slow.

 Thanks

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



>>>
>>>
>>
>


Re: Copying all Hive tables from Prod to UAT

2016-04-08 Thread Xiao Li
You also need to ensure no workload is running on both sides.

2016-04-08 15:54 GMT-07:00 Ali Gouta :

> For hive, you may use sqoop to achieve this. In my opinion, you may also
> run a spark job to make it..
> Le 9 avr. 2016 00:25, "Ashok Kumar"  a
> écrit :
>
> Hi,
>
> Anyone has suggestions how to create and copy Hive and Spark tables from
> Production to UAT.
>
> One way would be to copy table data to external files and then move the
> external files to a local target directory and populate the tables in
> target Hive with data.
>
> Is there an easier way of doing so?
>
> thanks
>
>
>


Re: Copying all Hive tables from Prod to UAT

2016-04-08 Thread Ali Gouta
For hive, you may use sqoop to achieve this. In my opinion, you may also
run a spark job to make it..
Le 9 avr. 2016 00:25, "Ashok Kumar"  a écrit :

Hi,

Anyone has suggestions how to create and copy Hive and Spark tables from
Production to UAT.

One way would be to copy table data to external files and then move the
external files to a local target directory and populate the tables in
target Hive with data.

Is there an easier way of doing so?

thanks


Copying all Hive tables from Prod to UAT

2016-04-08 Thread Ashok Kumar
Hi,
Anyone has suggestions how to create and copy Hive and Spark tables from 
Production to UAT.
One way would be to copy table data to external files and then move the 
external files to a local target directory and populate the tables in target 
Hive with data.
Is there an easier way of doing so?
thanks



Re: Need clarification regd deploy-mode client

2016-04-08 Thread bdev
Thanks Mandar for the clarification.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Need-clarification-regd-deploy-mode-client-tp26719p26725.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame job fails on parsing error, help?

2016-04-08 Thread Ted Yu
Not much.

So no chance of different snappy version ?

On Fri, Apr 8, 2016 at 1:26 PM, Nicolas Tilmans  wrote:

> Hi Ted,
>
> The Spark version is 1.6.1, a nearly identical set of operations does fine
> on smaller datasets. It's just a few joins then a groupBy and a count in
> pyspark.sql on a Spark DataFrame.
>
> Any ideas?
>
> Nicolas
>
> On Fri, Apr 8, 2016 at 1:13 PM, Ted Yu  wrote:
>
>> Did you encounter similar error on a smaller dataset ?
>>
>> Which release of Spark are you using ?
>>
>> Is it possible you have an incompatible snappy version somewhere in your
>> classpath ?
>>
>> Thanks
>>
>> On Fri, Apr 8, 2016 at 12:36 PM, entee  wrote:
>>
>>> I'm trying to do a relatively large join (0.5TB shuffle read/write) and
>>> just
>>> calling a count (or show) on the dataframe fails to complete, getting to
>>> the
>>> last task before failing:
>>>
>>> Py4JJavaError: An error occurred while calling o133.count.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 5
>>> in stage 11.0 failed 4 times, most recent failure: Lost task 5.3 in stage
>>> 11.0 (TID 7836, .com): java.io.IOException: failed to uncompress the
>>> chunk: PARSING_ERROR(2)
>>>
>>> (full stack trace below)
>>>
>>> I'm not sure why this would happen, any ideas about whether our
>>> configuration is off or how to fix this?
>>>
>>> Nicolas
>>>
>>>
>>>
>>> Py4JJavaError: An error occurred while calling o133.count.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 5
>>> in stage 11.0 failed 4 times, most recent failure: Lost task 5.3 in stage
>>> 11.0 (TID 7836, .com): java.io.IOException: failed to uncompress the
>>> chunk: PARSING_ERROR(2)
>>> at
>>>
>>> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
>>> at
>>> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
>>> at
>>> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
>>> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>>> at
>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
>>> at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>>> at java.io.DataInputStream.read(DataInputStream.java:149)
>>> at
>>> org.spark-project.guava.io.ByteStreams.read(ByteStreams.java:899)
>>> at
>>> org.spark-project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
>>> at
>>>
>>> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:119)
>>> at
>>>
>>> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:102)
>>> at scala.collection.Iterator$$anon$12.next(Iterator.scala:397)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
>>> at
>>>
>>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>>> at
>>>
>>> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
>>> at
>>>
>>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168)
>>> at
>>> org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
>>> at
>>> org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
>>> at
>>>
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>>> at
>>>
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at
>>>
>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at
>>>
>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
>>> at
>>>
>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
>>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
>>> at
>>>
>>> 

Re: DataFrame job fails on parsing error, help?

2016-04-08 Thread Ted Yu
Did you encounter similar error on a smaller dataset ?

Which release of Spark are you using ?

Is it possible you have an incompatible snappy version somewhere in your
classpath ?

Thanks

On Fri, Apr 8, 2016 at 12:36 PM, entee  wrote:

> I'm trying to do a relatively large join (0.5TB shuffle read/write) and
> just
> calling a count (or show) on the dataframe fails to complete, getting to
> the
> last task before failing:
>
> Py4JJavaError: An error occurred while calling o133.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
> in stage 11.0 failed 4 times, most recent failure: Lost task 5.3 in stage
> 11.0 (TID 7836, .com): java.io.IOException: failed to uncompress the
> chunk: PARSING_ERROR(2)
>
> (full stack trace below)
>
> I'm not sure why this would happen, any ideas about whether our
> configuration is off or how to fix this?
>
> Nicolas
>
>
>
> Py4JJavaError: An error occurred while calling o133.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
> in stage 11.0 failed 4 times, most recent failure: Lost task 5.3 in stage
> 11.0 (TID 7836, .com): java.io.IOException: failed to uncompress the
> chunk: PARSING_ERROR(2)
> at
>
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
> at
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
> at
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at
> org.spark-project.guava.io.ByteStreams.read(ByteStreams.java:899)
> at
> org.spark-project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
> at
>
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:119)
> at
>
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:102)
> at scala.collection.Iterator$$anon$12.next(Iterator.scala:397)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
> at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at
>
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168)
> at
> org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
> at
> org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
> at
>
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
> at
>
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
>
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
>
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
> at
>
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at
>
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:163)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at
> 

Re: Work on Spark engine for Hive

2016-04-08 Thread Mich Talebzadeh
The fundamental problem seems to be the spark-assembly-n.n.n-hadoopn.n..jar
libraries that are incompatible and cause issues. For example Hive does not
work with existing Spark 1.6.1 binaries, In other words if you set
hive.execution.engine in the following $HIVE_HOME/cong/hive-site.xml

hive.execution.engine

*spark*
  Expects one of [mr, tez, spark].
  Chooses execution engine. Options are: mr (Map reduce, default), tez,
spark. While MR
  remains the default engine for historical reasons, it is itself a
historical engine
  and is deprecated in Hive 2 line. It may be removed without further
warning.


It will crash.

In short it only currently works for me Spark 1.3.1 binaries together with
putting the spark assembly jar file spark-assembly-1.3.1-hadoop2.4.0.jar (to
be extracted via Spark 1.3.1 source build) and put in $HIVE_HOME/lib and
installing Spark 1.3.1 binaries.

Afterwards whenever you invoke Hive you will need to initialise it using
the following:

set spark.home=/usr/lib/spark-1.3.1-bin-hadoop2.6;
set hive.execution.engine=spark;
set spark.master=yarn-client;
This is just a work-around which is not what you want.

HTH






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 8 April 2016 at 19:16, Szehon Ho  wrote:

> Yes, that is a good goal we will have to do eventually.  I was not aware
> that it is not working to be honest.
>
> Can you let us know what is broken on Hive 2 on Spark 1.6.1?  Preferably
> via filing a JIRA on HIVE side?
>
> On Fri, Apr 8, 2016 at 7:47 AM, Mich Talebzadeh  > wrote:
>
>> This is a different thing. the question is when will Hive 2 be able to
>> run on Spark 1.6.1 installed binaries as execution engine.
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 8 April 2016 at 11:30, 469564481 <469564...@qq.com> wrote:
>>
>>>   I do not install spark engines.
>>>   I can use jdbc  connectting to hive and execute
>>> sql(create,drop...),but odbc testcase(HiveclientTest) can connect to hive,
>>> can not execute sql.
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Mich Talebzadeh";;
>>> *发送时间:* 2016年4月8日(星期五) 下午5:02
>>> *收件人:* "user"; "user @spark";
>>>
>>> *主题:* Work on Spark engine for Hive
>>>
>>> Hi,
>>>
>>> Is there any scheduled work to enable Hive to use recent version of
>>> Spark engines?
>>>
>>> This is becoming an issue as some applications have to rely on MapR
>>> engine to do operations on Hive 2 which is serial and slow.
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>>
>


DataFrame job fails on parsing error, help?

2016-04-08 Thread entee
I'm trying to do a relatively large join (0.5TB shuffle read/write) and just
calling a count (or show) on the dataframe fails to complete, getting to the
last task before failing:

Py4JJavaError: An error occurred while calling o133.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
in stage 11.0 failed 4 times, most recent failure: Lost task 5.3 in stage
11.0 (TID 7836, .com): java.io.IOException: failed to uncompress the
chunk: PARSING_ERROR(2)

(full stack trace below)

I'm not sure why this would happen, any ideas about whether our
configuration is off or how to fix this?

Nicolas



Py4JJavaError: An error occurred while calling o133.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
in stage 11.0 failed 4 times, most recent failure: Lost task 5.3 in stage
11.0 (TID 7836, .com): java.io.IOException: failed to uncompress the
chunk: PARSING_ERROR(2)
at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
at 
org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.spark-project.guava.io.ByteStreams.read(ByteStreams.java:899)
at 
org.spark-project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:119)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:102)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:397)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168)
at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:163)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 

Re: ordering over structs

2016-04-08 Thread Michael Armbrust
You need to use the struct function

(which creates an actual struct), you are trying to use the struct datatype
(which just represents the schema of a struct).

On Thu, Apr 7, 2016 at 3:48 PM, Imran Akbar  wrote:

> thanks Michael,
>
>
> I'm trying to implement the code in pyspark like so (where my dataframe
> has 3 columns - customer_id, dt, and product):
>
> st = StructType().add("dt", DateType(), True).add("product", StringType(),
> True)
>
> top = data.select("customer_id", st.alias('vs'))
>   .groupBy("customer_id")
>   .agg(max("dt").alias("vs"))
>   .select("customer_id", "vs.dt", "vs.product")
>
> But I get an error saying:
>
> AttributeError: 'StructType' object has no attribute 'alias'
>
> Can I do this without aliasing the struct?  Or am I doing something
> incorrectly?
>
>
> regards,
>
> imran
>
> On Wed, Apr 6, 2016 at 4:16 PM, Michael Armbrust 
> wrote:
>
>> Ordering for a struct goes in order of the fields.  So the max struct is
>>> the one with the highest TotalValue (and then the highest category
>>>   if there are multiple entries with the same hour and total value).
>>>
>>> Is this due to "InterpretedOrdering" in StructType?
>>>
>>
>> That is one implementation, but the code generated ordering also follows
>> the same contract.
>>
>>
>>
>>>  4)  Is it faster doing it this way than doing a join or window function
>>> in Spark SQL?
>>>
>>> Way faster.  This is a very efficient way to calculate argmax.
>>>
>>> Can you explain how this is way faster than window function? I can
>>> understand join doesn't make sense in this case. But to calculate the
>>> grouping max, you just have to shuffle the data by grouping keys. You maybe
>>> can do a combiner on the mapper side before shuffling, but that is it. Do
>>> you mean windowing function in Spark SQL won't do any map side combiner,
>>> even it is for max?
>>>
>>
>> Windowing can't do partial aggregation and will have to collect all the
>> data for a group so that it can be sorted before applying the function.  In
>> contrast a max aggregation will do partial aggregation (map side combining)
>> and can be calculated in a streaming fashion.
>>
>> Also, aggregation is more common and thus has seen more optimization
>> beyond the theoretical limits described above.
>>
>>
>


Re: Spark Streaming - NotSerializableException: Methods & Closures:

2016-04-08 Thread jamborta
You could also try to put transform in a companion object.

On Fri, 8 Apr 2016 16:48 mpawashe [via Apache Spark User List], <
ml-node+s1001560n26718...@n3.nabble.com> wrote:

> The class declaration is already marked Serializable ("with Serializable")
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672p26718.html
> To unsubscribe from Spark Streaming - NotSerializableException: Methods &
> Closures:, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672p26723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Monitoring S3 Bucket with Spark Streaming

2016-04-08 Thread Benjamin Kim
Has anyone monitored an S3 bucket or directory using Spark Streaming and pulled 
any new files to process? If so, can you provide basic Scala coding help on 
this?

Thanks,
Ben


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Handling of slow elements in dstream's processing

2016-04-08 Thread OmkarP
Problem statement:
I am building a somewhat time-critical application that is supposed to
receive messages on a stream(ZMQ) and then operate on each of the data
points that comes in on the stream. The caveat is that some data points may
need more time for processing than most others since the processing involves
another network i/o (a remote fetch operation like an HTTP GET from various
remote servers / remote sources . And with timeouts of course). In this
scenario, I want the processing of the remaining data points to continue
without blocking for the data from the slow-processing data point. Also, i
want the next batch of data points out of the stream to start being
processed while we are still waiting for the previous batch's
slow-processing data point.

Progress so far:
To test out the concept, I tried the following: Created a test stream that
generates a random number every 500 ms. During processing of the Dstream out
of this, select a string "3" and purposely delay the processing for it and
observe the behavior.

// Just a test Stream that generates random number Strings .
class testInputDStream (
@transient ssc_ : StreamingContext,
storageLevel: StorageLevel
  ) extends ReceiverInputDStream[String](ssc_) with
Logging {
  def getReceiver(): Receiver[String] = {
new testReceiver(storageLevel)
  }
}
object testUtils {
  def createStream(
ssc: StreamingContext,
storageLevel: StorageLevel =
StorageLevel.MEMORY_AND_DISK_SER_2
  ): DStream[String] = {
new testInputDStream(ssc, storageLevel)
  }
}
class testReceiver (storageLevel: StorageLevel) extends
Receiver[String](storageLevel) with Logging {
  def onStop() {
  }
  def onStart() {
// Start the thread that receives data over a connection
new Thread("Test Receiver") {
  override def run() { receive() }
}.start()
  }
  /** Create a socket connection and receive data until receiver is stopped
*/
  private def receive(): Unit = {
println("Starting test receiver...")
val num = scala.util.Random
try {
  while(true) {
val newnum = num.nextInt(10)
store(newnum.toString())
Thread.sleep(100)
  }
} catch {
  case e: Exception => println(e)
}
  }
}

object test_streaming {
  private def gofetch(x :String): List[Any] = {
if (x == "3") {
  println("Slow Processing")
  Thread.sleep(2000)
}
return List(x, x, x)
  }

  def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

//  Prepare our context and stream
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("test_streaming")

// Initialization
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Milliseconds(500))

val rstream = testUtils.createStream(ssc, StorageLevel.MEMORY_ONLY)

val ourdata = rstream.map( x => gofetch(x))

ourdata.count().print()

ssc.start()
ssc.awaitTermination() // Wait for the computation to terminate*/
  }
}

However, I see that the slow processing Data point blocks other data points.
Is there a way of achieving what I am trying to do?

I would appreciate feedbacks / inputs.

Thanks,
Omkar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Handling-of-slow-elements-in-dstream-s-processing-tp26722.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming share state between two streams

2016-04-08 Thread Rishi Mishra
Hi Shekhar,
As both of your state functions does the same thing can't you do a union of
dtsreams before applying mapWithState() ? It might be difficult if one
state function is dependent on other state. This requires a named state,
which can be accessed in other state functions. I have not gone through the
details but the PR (https://github.com/apache/spark/pull/11645)  from
Tathagat seems to be in that direction .

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Fri, Apr 8, 2016 at 3:53 PM, Shekhar Bansal <
shekhar0...@yahoo.com.invalid> wrote:

> Hi
> Can we share spark streaming state between two DStreams??
> Basically I want to create state using first stream and enrich second
> stream using state.
> Example: I have modified StatefulNetworkWordCount example. I am creating
> state using first stream and enriching second stream with count of first
> stream.
>
> val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 
> 1)))
>
>
> val mappingFuncForFirstStream = (batchTime: Time, word: String, one: 
> Option[Int], state: State[Int]) => {
>   val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
>   val output = (word, sum)
>   state.update(sum)
>
>   Some(output)
> }
>
> val mappingFuncForSecondStream = (batchTime: Time, word: String, one: 
> Option[Int], state: State[Int]) => {
>   val sum = state.getOption.getOrElse(0)
>   val output = (word, sum)
>
>   Some(output)
> }
>
>
>
> // first stream
> KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet)
>   .flatMap(r=>r._2.split(" "))
>   .map(x => (x, 1))
>   
> .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
>   .print(1)
>
>
>
> // second stream
> KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams2, mergeTopicSet)
>   .flatMap(r=>r._2.split(" "))
>   .map(x => (x, 1))
>   
> .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
>   .print(50)
>
>
> In checkpointing directory, I can see two different state RDDs.
> I am using spark-1.6.1 and kafka-0.8.2.1
>
> Regards
> Shekhar
>


Re: Work on Spark engine for Hive

2016-04-08 Thread Szehon Ho
Yes, that is a good goal we will have to do eventually.  I was not aware
that it is not working to be honest.

Can you let us know what is broken on Hive 2 on Spark 1.6.1?  Preferably
via filing a JIRA on HIVE side?

On Fri, Apr 8, 2016 at 7:47 AM, Mich Talebzadeh 
wrote:

> This is a different thing. the question is when will Hive 2 be able to run
> on Spark 1.6.1 installed binaries as execution engine.
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 8 April 2016 at 11:30, 469564481 <469564...@qq.com> wrote:
>
>>   I do not install spark engines.
>>   I can use jdbc  connectting to hive and execute sql(create,drop...),but
>> odbc testcase(HiveclientTest) can connect to hive, can not execute sql.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Mich Talebzadeh";;
>> *发送时间:* 2016年4月8日(星期五) 下午5:02
>> *收件人:* "user"; "user @spark";
>>
>> *主题:* Work on Spark engine for Hive
>>
>> Hi,
>>
>> Is there any scheduled work to enable Hive to use recent version of Spark
>> engines?
>>
>> This is becoming an issue as some applications have to rely on MapR
>> engine to do operations on Hive 2 which is serial and slow.
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>


What is the best way to process streaming data from multiple channels simultaneously using Spark 2.0 API's?

2016-04-08 Thread imax
I’d like to use Spark 2.0 (streaming) API to consume data from a custom data
source that provides API for random access to stream of data that
represented as a “topic” that have collection of partitions that might be
accessed/consumed simultaneously. 

I want to implement a streaming process using the new Spark 2.0 API that
will fetch the data from aforementioned data source in distributed manner,
i.e. will create separate task per offset range per partition, like Kafka
direct stream does.

I'd like to know if there is a better way to achieve this goal using new
Spark 2.0 API.
Is there any reference implementation that I could look at?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-process-streaming-data-from-multiple-channels-simultaneously-using-Spark-2-0-tp26720.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to configure parquet.block.size on Spark 1.6

2016-04-08 Thread nihed mbarek
I can't write on hadoopConfig in Java

Le vendredi 8 avril 2016, Silvio Fiorito  a
écrit :

> Have you tried:
>
> sc.hadoopConfiguration.setLong(parquet.hadoop.ParquetOutputFormat.BLOCK_SIZE,
> N * 1024 * 1024)
>
> Not sure if it’d work or not, but since it’s getting it from the Hadoop
> config it should do it.
>
>
> From: nihed mbarek  >
> Date: Friday, April 8, 2016 at 12:01 PM
> To: "User@spark.apache.org
> " <
> User@spark.apache.org
> >
> Subject: How to configure parquet.block.size on Spark 1.6
>
> Hi
> How to configure parquet.block.size on Spark 1.6 ?
>
> Thank you
> Nihed MBAREK
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> 
>
>
>
>
>

-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Need clarification regd deploy-mode client

2016-04-08 Thread bdev
I'm running pyspark with deploy-mode as client with yarn using dynamic
allocation:
 pyspark --master yarn --deploy-mode client --executor-memory 6g
--executor-cores 4 --driver-memory 4g

The node where I'm running pyspark has 4GB memory but I keep running out of
memory on this node. If using yarn, it isn't clear to me why the memory
consumption is so high on the client node. Can someone please let me know if
this is expected?


Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Need-clarification-regd-deploy-mode-client-tp26719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to configure parquet.block.size on Spark 1.6

2016-04-08 Thread Ted Yu
I searched 1.6.1 code base but didn't find how this can be configured
(within Spark).

On Fri, Apr 8, 2016 at 9:01 AM, nihed mbarek  wrote:

> Hi
> How to configure parquet.block.size on Spark 1.6 ?
>
> Thank you
> Nihed MBAREK
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> 
>
>
>


How to configure parquet.block.size on Spark 1.6

2016-04-08 Thread nihed mbarek
Hi
How to configure parquet.block.size on Spark 1.6 ?

Thank you
Nihed MBAREK


-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Re: Spark Streaming - NotSerializableException: Methods & Closures:

2016-04-08 Thread mpawashe
The class declaration is already marked Serializable ("with Serializable")



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672p26718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Good Book on Hadoop Interview

2016-04-08 Thread Chaturvedi Chola
A good book on big data interview FAQ
http://www.amazon.in/Big-Data-Interview-FAQs-Chinnasamy/dp/9386009188/ref=sr_1_1?ie=UTF8=1459943243=8-1=9789386009180
https://notionpress.com/read/big-data-interview-faqs


http://www.bookadda.com/books/big-data-interview-faqs-chinnasamy-9386009188-9789386009180
http://www.amazon.in/Big-Data-Interview-FAQs-Chinnasamy/dp/9386009188/ref=sr_1_1?ie=UTF8=1459943243=8-1=9789386009180
https://www.sapnaonline.com/books/big-data-interview-faqs-chinnasamy-9386009188-9789386009180
https://paytm.com/shop/p/big-data-interview-faqs-9789386009180_22290?src=search-grid=organic%7Cundefined%7C9789386009180%7Cgrid%7CSearch%7C1


Re: Running Spark on Yarn-Client/Cluster mode

2016-04-08 Thread ashesh_28
Hi Dhiraj , 

Thanks for the clarification , 
Yes i indeed checked that Both YARN related (Nodemanager & ResourceManager)
daemons are running in their respective nodes and i can access HDFS
directory structure from each node.

I am using Hadoop version 2.7.2 and i have downloaded Pre-build version for
Spark which supported for hadoop 2.6 and later (The latest available
version).

Well i have already confirmed that HADOOP_CONF_DIR are  pointing to the
correct hadoop /etc/hadoop/ location.

Can you suggest me if any settings has to be done in spark-defaults.conf
file ?
Also i am trying to understand on the arguments which has to be passed along
with yarn-client command like --executor-memory and --driver-memory. Can you
suggest a possible values for those arguments based on my VM Specs as
mentioned above ?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-on-Yarn-Client-Cluster-mode-tp26691p26717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Work on Spark engine for Hive

2016-04-08 Thread Mich Talebzadeh
This is a different thing. the question is when will Hive 2 be able to run
on Spark 1.6.1 installed binaries as execution engine.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 8 April 2016 at 11:30, 469564481 <469564...@qq.com> wrote:

>   I do not install spark engines.
>   I can use jdbc  connectting to hive and execute sql(create,drop...),but
> odbc testcase(HiveclientTest) can connect to hive, can not execute sql.
>
>
> -- 原始邮件 --
> *发件人:* "Mich Talebzadeh";;
> *发送时间:* 2016年4月8日(星期五) 下午5:02
> *收件人:* "user"; "user @spark";
>
> *主题:* Work on Spark engine for Hive
>
> Hi,
>
> Is there any scheduled work to enable Hive to use recent version of Spark
> engines?
>
> This is becoming an issue as some applications have to rely on MapR engine
> to do operations on Hive 2 which is serial and slow.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: Sqoop on Spark

2016-04-08 Thread Mich Talebzadeh
Well unless you have plenty of memory, you are going to have certain issues
with Spark.

I tried to load a billion rows table from oracle through spark using JDBC
and ended up with "Caused by: java.lang.OutOfMemoryError: Java heap space"
error.

Sqoop uses MapR and does it in serial mode which takes time and you can
also tell it to create Hive table. However, it will import data into Hive
table.

In any case the mechanism of data import is through JDBC, Spark uses memory
and DAG, whereas Sqoop relies on MapR.

There is of course another alternative.

Assuming that your Oracle table has a primary Key say "ID" (it would be
easier if it was a monotonically increasing number) or already partitioned.


   1. You can create views based on the range of ID or for each partition.
   You can then SELECT COLUMNS  co1, col2, coln from view and spool it to a
   text file on OS (locally say backup directory would be fastest).
   2. bzip2 those files and scp them to a local directory in Hadoop
   3. You can then use Spark/hive to load the target table from local files
   in parallel
   4. When creating views take care of NUMBER and CHAR columns in Oracle
   and convert them to TO_CHAR(NUMBER_COLUMN) and varchar CAST(coln AS
   VARCHAR2(n)) AS coln etc


HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 8 April 2016 at 10:07, Gourav Sengupta  wrote:

> Hi,
>
> Some metrics thrown around the discussion:
>
> SQOOP: extract 500 million rows (in single thread) 20 mins (data size 21
> GB)
> SPARK: load the data into memory (15 mins)
>
> SPARK: use JDBC (and similar to SQOOP difficult parallelization) to load
> 500 million records - manually killed after 8 hours.
>
> (both the above studies were done in a system of same capacity, with 32 GB
> RAM and dual hexacore Xeon processors and SSD. SPARK was running locally,
> and SQOOP ran on HADOOP2 and extracted data to local file system)
>
> In case any one needs to know what needs to be done to access both the CSV
> and JDBC modules in SPARK Local Server mode, please let me know.
>
>
> Regards,
> Gourav Sengupta
>
> On Thu, Apr 7, 2016 at 12:26 AM, Yong Zhang  wrote:
>
>> Good to know that.
>>
>> That is why Sqoop has this "direct" mode, to utilize the vendor specific
>> feature.
>>
>> But for MPP, I still think it makes sense that vendor provide some kind
>> of InputFormat, or data source in Spark, so Hadoop eco-system can integrate
>> with them more natively.
>>
>> Yong
>>
>> --
>> Date: Wed, 6 Apr 2016 16:12:30 -0700
>> Subject: Re: Sqoop on Spark
>> From: mohaj...@gmail.com
>> To: java8...@hotmail.com
>> CC: mich.talebza...@gmail.com; jornfra...@gmail.com;
>> msegel_had...@hotmail.com; guha.a...@gmail.com; linguin@gmail.com;
>> user@spark.apache.org
>>
>>
>> It is using JDBC driver, i know that's the case for Teradata:
>>
>> http://developer.teradata.com/connectivity/articles/teradata-connector-for-hadoop-now-available
>>
>> Teradata Connector (which is used by Cloudera and Hortonworks) for doing
>> Sqoop is parallelized and works with ORC and probably other formats as
>> well. It is using JDBC for each connection between data-nodes and their AMP
>> (compute) nodes. There is an additional layer that coordinates all of it.
>> I know Oracle has a similar technology I've used it and had to supply the
>> JDBC driver.
>>
>> Teradata Connector is for batch data copy, QueryGrid is for interactive
>> data movement.
>>
>> On Wed, Apr 6, 2016 at 4:05 PM, Yong Zhang  wrote:
>>
>> If they do that, they must provide a customized input format, instead of
>> through JDBC.
>>
>> Yong
>>
>> --
>> Date: Wed, 6 Apr 2016 23:56:54 +0100
>> Subject: Re: Sqoop on Spark
>> From: mich.talebza...@gmail.com
>> To: mohaj...@gmail.com
>> CC: jornfra...@gmail.com; msegel_had...@hotmail.com; guha.a...@gmail.com;
>> linguin@gmail.com; user@spark.apache.org
>>
>>
>> SAP Sybase IQ does that and I believe SAP Hana as well.
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> On 6 April 2016 at 23:49, Peyman Mohajerian  wrote:
>>
>> For some MPP relational stores (not operational) it maybe feasible to run
>> Spark jobs and also have data locality. I know QueryGrid (Teradata) and
>> PolyBase (microsoft) use data locality to move data between their MPP and
>> Hadoop.
>> I would guess (have no idea) someone like IBM already is doing that for
>> Spark, maybe a bit off topic!
>>
>> On Wed, Apr 6, 2016 at 3:29 PM, Jörn Franke  wrote:
>>

How to import D3 library in Spark

2016-04-08 Thread Chadha Pooja
Hi

I am using Amazon EMR for running Spark and would like to reproduce something 
similar to the graph in the end of this link : 
https://docs.cloud.databricks.com/docs/latest/featured_notebooks/Wikipedia%20Clickstream%20Data.html

Can someone help me with how to import d3 library in Spark ? I am using 
Zeppelin, and looking to use Scala/R or Python with d3.

Thanks
Pooja

__
The Boston Consulting Group, Inc.
 
This e-mail message may contain confidential and/or privileged information.
If you are not an addressee or otherwise authorized to receive this message,
you should not use, copy, disclose or take any action based on this e-mail or
any information contained in the message. If you have received this material
in error, please advise the sender immediately by reply e-mail and delete this
message. Thank you.


Streaming k-means visualization

2016-04-08 Thread Priya Ch
Hi All,

 I am using Streaming k-means to train my model on streaming data. Now I
want to visualize the clusters. What would be the reporting tool used for
this ? Would zeppelin used to visualize the clusters

Regards,
Padma Ch


Re: can not join dataset with itself

2016-04-08 Thread JH P
I’m using Spark 1.6.1

Class is case class DistinctValues(statType: Int, dataType: Int, _id: Int, 
values: Array[(String, Long)], numOfMembers: Int,category: String)

and

error for newGnsDS.joinWith(newGnsDS, $"dataType”)
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
resolve 'dataType' given input columns: [countUnique, median, recentEdid, max, 
cdid, dataType, firstQuarter, sigma, replicationRateAvg, thirdQuarter, 
accCount, avg, countNotNull, statType, categoryId, category, min, numRows, 
numDistinctRows];

error for newGnsDS.as("a").joinWith(newGnsDS.as("b"), $"a.dataType" === 
$"b.datatype”)
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
resolve 'a.dataType' given input columns: [countUnique, median, recentEdid, 
max, cdid, dataType, firstQuarter, sigma, replicationRateAvg, thirdQuarter, 
accCount, avg, countNotNull, statType, categoryId, category, min, numRows, 
numDistinctRows];

Common error
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:108)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:119)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:57)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
at 

Re: can not join dataset with itself

2016-04-08 Thread Ted Yu
Looks like you're using Spark 1.6.x

What error(s) did you get for the first two joins ?

Thanks

On Fri, Apr 8, 2016 at 3:53 AM, JH P  wrote:

> Hi. I want a dataset join with itself. So i tried below codes.
>
> 1. newGnsDS.joinWith(newGnsDS, $"dataType”)
>
> 2. newGnsDS.as("a").joinWith(newGnsDS.as("b"), $"a.dataType" === $
> "b.datatype”)
>
> 3. val a = newGnsDS.map(x => x).as("a")
>val b = newGnsDS.map(x => x).as("b")
>
>
>a.joinWith(b, $"a.dataType" === $"b.datatype")
>
> 1,2 doesn’t work, but 3 works. I don’t know why it works, better idea
> exists. please help
>


Re: Running Spark on Yarn-Client/Cluster mode

2016-04-08 Thread ashesh_28
Few more added information  with Nodes Memory and Core

ptfhadoop01v - 4GB
ntpcam01v - 1GB
ntpcam03v - 2GB

Each of the VM has only 1 core CPU



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-on-Yarn-Client-Cluster-mode-tp26691p26714.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [HELP:]Save Spark Dataframe in Phoenix Table

2016-04-08 Thread Josh Mahonin
Hi Divya,

That's strange. Are you able to post a snippet of your code to look at? And
are you sure that you're saving the dataframes as per the docs (
https://phoenix.apache.org/phoenix_spark.html)?

Depending on your HDP version, it may or may not actually have
phoenix-spark support. Double-check that your Spark configuration is setup
with the right worker/driver classpath settings. and that the phoenix JARs
contain the necessary phoenix-spark classes
(e.g. org.apache.phoenix.spark.PhoenixRelation). If not, I suggest
following up with Hortonworks.

Josh



On Fri, Apr 8, 2016 at 1:22 AM, Divya Gehlot 
wrote:

> Hi,
> I hava a Hortonworks Hadoop cluster having below Configurations :
> Spark 1.5.2
> HBASE 1.1.x
> Phoenix 4.4
>
> I am able to connect to Phoenix through JDBC connection and able to read
> the Phoenix tables .
> But while writing the data back to Phoenix table
> I am getting below error :
>
> org.apache.spark.sql.AnalysisException:
> org.apache.phoenix.spark.DefaultSource does not allow user-specified
> schemas.;
>
> Can any body help in resolving the above errors or any other solution of
> saving Spark Dataframes to Phoenix.
>
> Would really appareciate the help.
>
> Thanks,
> Divya
>


can not join dataset with itself

2016-04-08 Thread JH P
Hi. I want a dataset join with itself. So i tried below codes.

1. newGnsDS.joinWith(newGnsDS, $"dataType”)

2. newGnsDS.as("a").joinWith(newGnsDS.as("b"), $"a.dataType" === $"b.datatype”)

3. val a = newGnsDS.map(x => x).as("a")
   val b = newGnsDS.map(x => x).as("b")
  
   a.joinWith(b, $"a.dataType" === $"b.datatype")

1,2 doesn’t work, but 3 works. I don’t know why it works, better idea exists. 
please help

Spark Streaming share state between two streams

2016-04-08 Thread Shekhar Bansal
HiCan we share spark streaming state between two DStreams??Basically I want to 
create state using first stream and enrich second stream using state.Example: I 
have modified StatefulNetworkWordCount example. I am creating state using first 
stream and enriching second stream with count of first stream.val initialRDD = 
ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))


val mappingFuncForFirstStream = (batchTime: Time, word: String, one: 
Option[Int], state: State[Int]) => {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  val output = (word, sum)
  state.update(sum)

  Some(output)
}

val mappingFuncForSecondStream = (batchTime: Time, word: String, one: 
Option[Int], state: State[Int]) => {
  val sum = state.getOption.getOrElse(0)
  val output = (word, sum)

  Some(output)
}



// first stream
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)
  .flatMap(r=>r._2.split(" "))
  .map(x => (x, 1))
  
.mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
  .print(1)



// second stream
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams2, mergeTopicSet)
  .flatMap(r=>r._2.split(" "))
  .map(x => (x, 1))
  
.mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
  .print(50)
In checkpointing directory, I can see two different state RDDs.I am using 
spark-1.6.1 and kafka-0.8.2.1
RegardsShekhar

Re: About nested RDD

2016-04-08 Thread Rishi Mishra
As mentioned earlier you can create a broadcast variable containing all the
small RDD elements. I hope they are really small.  Then you can fire
A.updatae(broadcastVariable).

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Fri, Apr 8, 2016 at 2:33 PM, Tenghuan He  wrote:

> Hi
>
> Thanks for your reply.
> Yes, It's very much like the union() method, but there is some difference.
>
> I have a very large RDD A, and a lot of small RDDs b, c, d and so on.
> and A.update(a) will update some element in the A and return a new RDD
>
> when calling
> val B = A.update(b).update(c).update(d).update().
> B.count()
>
> The count action will call the compute method.
> and each update will iterating the large rdd A.
> To avoid this I can merge these small rdds first to rdds then call
> A.update(rdds)
> But I don't hope to do this merge manually outside but inside RDD A
> automatically
>
> I hope I made it clear.
> ​
>
> On Fri, Apr 8, 2016 at 4:22 PM, Holden Karau  wrote:
>
>> It seems like the union function on RDDs might be what you are looking
>> for, or was there something else you were trying to achieve?
>>
>>
>> On Thursday, April 7, 2016, Tenghuan He  wrote:
>>
>>> Hi all,
>>>
>>> I know that nested RDDs are not possible like linke rdd1.map(x => x +
>>> rdd2.count())
>>> I tried to create a custome RDD like following
>>>
>>> class MyRDD(base: RDD, part: Partitioner) extends RDD[(K, V)] {
>>>
>>> var rdds = new  ArrayBuffer.empty[RDD[(K, (V, Int))]]
>>> def update(rdd: RDD[_]) {
>>>   udds += rdd
>>> }
>>> def comput ...
>>> def getPartitions ...
>>> }
>>>
>>> In the compute method I call the internal rdds' iterators and got
>>> NullPointerException
>>> Is this also a form of nested RDDs and how do I get rid of this?
>>>
>>> Thanks.
>>>
>>>
>>> Tenghuan
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>
>


Re: Sqoop on Spark

2016-04-08 Thread Gourav Sengupta
Hi,

Some metrics thrown around the discussion:

SQOOP: extract 500 million rows (in single thread) 20 mins (data size 21 GB)
SPARK: load the data into memory (15 mins)

SPARK: use JDBC (and similar to SQOOP difficult parallelization) to load
500 million records - manually killed after 8 hours.

(both the above studies were done in a system of same capacity, with 32 GB
RAM and dual hexacore Xeon processors and SSD. SPARK was running locally,
and SQOOP ran on HADOOP2 and extracted data to local file system)

In case any one needs to know what needs to be done to access both the CSV
and JDBC modules in SPARK Local Server mode, please let me know.


Regards,
Gourav Sengupta

On Thu, Apr 7, 2016 at 12:26 AM, Yong Zhang  wrote:

> Good to know that.
>
> That is why Sqoop has this "direct" mode, to utilize the vendor specific
> feature.
>
> But for MPP, I still think it makes sense that vendor provide some kind of
> InputFormat, or data source in Spark, so Hadoop eco-system can integrate
> with them more natively.
>
> Yong
>
> --
> Date: Wed, 6 Apr 2016 16:12:30 -0700
> Subject: Re: Sqoop on Spark
> From: mohaj...@gmail.com
> To: java8...@hotmail.com
> CC: mich.talebza...@gmail.com; jornfra...@gmail.com;
> msegel_had...@hotmail.com; guha.a...@gmail.com; linguin@gmail.com;
> user@spark.apache.org
>
>
> It is using JDBC driver, i know that's the case for Teradata:
>
> http://developer.teradata.com/connectivity/articles/teradata-connector-for-hadoop-now-available
>
> Teradata Connector (which is used by Cloudera and Hortonworks) for doing
> Sqoop is parallelized and works with ORC and probably other formats as
> well. It is using JDBC for each connection between data-nodes and their AMP
> (compute) nodes. There is an additional layer that coordinates all of it.
> I know Oracle has a similar technology I've used it and had to supply the
> JDBC driver.
>
> Teradata Connector is for batch data copy, QueryGrid is for interactive
> data movement.
>
> On Wed, Apr 6, 2016 at 4:05 PM, Yong Zhang  wrote:
>
> If they do that, they must provide a customized input format, instead of
> through JDBC.
>
> Yong
>
> --
> Date: Wed, 6 Apr 2016 23:56:54 +0100
> Subject: Re: Sqoop on Spark
> From: mich.talebza...@gmail.com
> To: mohaj...@gmail.com
> CC: jornfra...@gmail.com; msegel_had...@hotmail.com; guha.a...@gmail.com;
> linguin@gmail.com; user@spark.apache.org
>
>
> SAP Sybase IQ does that and I believe SAP Hana as well.
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
> On 6 April 2016 at 23:49, Peyman Mohajerian  wrote:
>
> For some MPP relational stores (not operational) it maybe feasible to run
> Spark jobs and also have data locality. I know QueryGrid (Teradata) and
> PolyBase (microsoft) use data locality to move data between their MPP and
> Hadoop.
> I would guess (have no idea) someone like IBM already is doing that for
> Spark, maybe a bit off topic!
>
> On Wed, Apr 6, 2016 at 3:29 PM, Jörn Franke  wrote:
>
> Well I am not sure, but using a database as a storage, such as relational
> databases or certain nosql databases (eg MongoDB) for Spark is generally a
> bad idea - no data locality, it cannot handle real big data volumes for
> compute and you may potentially overload an operational database.
> And if your job fails for whatever reason (eg scheduling ) then you have
> to pull everything out again. Sqoop and HDFS seems to me the more elegant
> solution together with spark. These "assumption" on parallelism have to be
> anyway made with any solution.
> Of course you can always redo things, but why - what benefit do you
> expect? A real big data platform has to support anyway many different tools
> otherwise people doing analytics will be limited.
>
> On 06 Apr 2016, at 20:05, Michael Segel  wrote:
>
> I don’t think its necessarily a bad idea.
>
> Sqoop is an ugly tool and it requires you to make some assumptions as a
> way to gain parallelism. (Not that most of the assumptions are not valid
> for most of the use cases…)
>
> Depending on what you want to do… your data may not be persisted on HDFS.
> There are use cases where your cluster is used for compute and not storage.
>
> I’d say that spending time re-inventing the wheel can be a good thing.
> It would be a good idea for many to rethink their ingestion process so
> that they can have a nice ‘data lake’ and not a ‘data sewer’. (Stealing
> that term from Dean Wampler. ;-)
>
> Just saying. ;-)
>
> -Mike
>
> On Apr 5, 2016, at 10:44 PM, Jörn Franke  wrote:
>
> I do not think you can be more resource efficient. In the end you have to
> store the data anyway on HDFS . You 

Re: Running Spark on Yarn-Client/Cluster mode

2016-04-08 Thread ashesh_28
Hi , 

Just a Quick Update , After trying for a while , i rebooted all the Three
machines used in the Cluster and formatted namenode and ZKFC . Then i
started every Daemon in the Cluster.

After all the Daemons were up and Running i tried to issue the same command
as earlier 


 

As you can see the SparkContext is started , But i still some ERROR entries
in there 
"ERROR YarnClientSchedulerBackend: Yarn application has already exited with
state FAILED!"

Also , if i Type exit() in the end and then Again try to re-issue the same
command to start Spark on Yarn-Client then it does not even start and takes
me back to the error message posted earlier.


 

I have no idea on what is causing this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-on-Yarn-Client-Cluster-mode-tp26691p26713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: About nested RDD

2016-04-08 Thread Tenghuan He
Hi

Thanks for your reply.
Yes, It's very much like the union() method, but there is some difference.

I have a very large RDD A, and a lot of small RDDs b, c, d and so on.
and A.update(a) will update some element in the A and return a new RDD

when calling
val B = A.update(b).update(c).update(d).update().
B.count()

The count action will call the compute method.
and each update will iterating the large rdd A.
To avoid this I can merge these small rdds first to rdds then call
A.update(rdds)
But I don't hope to do this merge manually outside but inside RDD A
automatically

I hope I made it clear.
​

On Fri, Apr 8, 2016 at 4:22 PM, Holden Karau  wrote:

> It seems like the union function on RDDs might be what you are looking
> for, or was there something else you were trying to achieve?
>
>
> On Thursday, April 7, 2016, Tenghuan He  wrote:
>
>> Hi all,
>>
>> I know that nested RDDs are not possible like linke rdd1.map(x => x +
>> rdd2.count())
>> I tried to create a custome RDD like following
>>
>> class MyRDD(base: RDD, part: Partitioner) extends RDD[(K, V)] {
>>
>> var rdds = new  ArrayBuffer.empty[RDD[(K, (V, Int))]]
>> def update(rdd: RDD[_]) {
>>   udds += rdd
>> }
>> def comput ...
>> def getPartitions ...
>> }
>>
>> In the compute method I call the internal rdds' iterators and got
>> NullPointerException
>> Is this also a form of nested RDDs and how do I get rid of this?
>>
>> Thanks.
>>
>>
>> Tenghuan
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


Re: Why do I need to handle dependencies on EMR but not on-prem Hadoop?

2016-04-08 Thread Sean Owen
You probably just got lucky, and the default Python distribution on
your CDH nodes has this library but the EMR one doesn't. (CDH actually
has an Anaconda distribution, not sure if you enabled that.) In
general you need to make dependencies available that your app does not
supply.

On Fri, Apr 8, 2016 at 8:28 AM, YaoPau  wrote:
> On-prem I'm running PySpark on Cloudera's distribution, and I've never had to
> worry about dependency issues.  I import my libraries on my driver node only
> using pip or conda, run my jobs in yarn-client mode, and everything works (I
> just assumed the relevant libraries are copied temporarily to each executor
> node during execution).
>
> But on EMR, I installed a library called fuzzywuzzy on the driver using pip,
> then tried running this basic script in "pyspark --master yarn-client" mode:
>

> mydata = sc.textFile("s3n://my_bucket/rum_20160331/*")
> sample = mydata.take(3)
> new_rdd = sc.parallelize(sample)
> import random
> import fuzzywuzzy
>
> choices = ['hello', 'xylophone', 'zebra']
> mapped_rdd = new_rdd.map(lambda row: str(fuzzywuzzy.process.extract(row,
> choices, limit=2)))
> mapped_rdd.collect()

>
> and I'm getting the error:
>
> ImportError: ('No module named fuzzywuzzy',  0x7fa66610a938>, ('fuzzywuzzy',))
>
> which makes me think I have to use py-files for the first time ever, and
> resolve dependencies manually.
>
> Why does this happen?  How is it that, on the on-prem Cloudera version,
> Spark executor nodes are able to access all the libraries I've only
> installed on my driver, but on EMR they can't?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-do-I-need-to-handle-dependencies-on-EMR-but-not-on-prem-Hadoop-tp26712.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Work on Spark engine for Hive

2016-04-08 Thread Mich Talebzadeh
Hi,

Is there any scheduled work to enable Hive to use recent version of Spark
engines?

This is becoming an issue as some applications have to rely on MapR engine
to do operations on Hive 2 which is serial and slow.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: May I ask a question about SparkSql

2016-04-08 Thread Mich Talebzadeh
Hi Jackie,

Can you create a DF from RDD and register it as temp table? This should
work. Although I have not tried it.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 8 April 2016 at 09:28, Hustjackie  wrote:

> Hi all,
>
>
>
> I have several jobs running with Spark-Streaming, but I prefer to run some
> sql to do the same things.
>
> So does the SparkSql support real-time jobs, in another words, Do spark
> support spark streaming SQL.
>
>
> Thanks in advance, any help are appreciate.
>
>
>
> Jacky
>


Re: May I ask a question about SparkSql

2016-04-08 Thread Kasinathan, Prabhu
Check this one. https://github.com/Intel-bigdata/spark-streamingsql. We tried 
it and it was working with Spark 1.3.1. You can do ETL on Spark Streaming 
Context using Spark Sql.

Thanks
Prabhu

From: Hustjackie >
Reply-To: "hustjac...@sina.cn" 
>
Date: Friday, April 8, 2016 at 1:58 PM
To: user >
Subject: May I ask a question about SparkSql


Hi all,



I have several jobs running with Spark-Streaming, but I prefer to run some sql 
to do the same things.

So does the SparkSql support real-time jobs, in another words, Do spark support 
spark streaming SQL.


Thanks in advance, any help are appreciate.



Jacky


Re: About nested RDD

2016-04-08 Thread Rishi Mishra
rdd.count() is a fairly straightforward operations which can  be calculated
on a driver and then the value can be included in the map function.
Is your goal is to write a generic function which operates on two rdds, one
rdd being evaluated for each partition of the other ?
Here also you can use broadcast , if one of your RDD is small enough. If
both the RDDs are fairly big, I would like to understand your use case
better.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Fri, Apr 8, 2016 at 1:52 PM, Holden Karau  wrote:

> It seems like the union function on RDDs might be what you are looking
> for, or was there something else you were trying to achieve?
>
>
> On Thursday, April 7, 2016, Tenghuan He  wrote:
>
>> Hi all,
>>
>> I know that nested RDDs are not possible like linke rdd1.map(x => x +
>> rdd2.count())
>> I tried to create a custome RDD like following
>>
>> class MyRDD(base: RDD, part: Partitioner) extends RDD[(K, V)] {
>>
>> var rdds = new  ArrayBuffer.empty[RDD[(K, (V, Int))]]
>> def update(rdd: RDD[_]) {
>>   udds += rdd
>> }
>> def comput ...
>> def getPartitions ...
>> }
>>
>> In the compute method I call the internal rdds' iterators and got
>> NullPointerException
>> Is this also a form of nested RDDs and how do I get rid of this?
>>
>> Thanks.
>>
>>
>> Tenghuan
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


May I ask a question about SparkSql

2016-04-08 Thread Hustjackie
Hi all, 

I have several jobs running with Spark-Streaming, but I prefer to run some sql 
to do the same things.So does the SparkSql support real-time jobs, in another 
words, Do spark support spark streaming SQL.
Thanks in advance, any help are appreciate.

Jacky

Re: About nested RDD

2016-04-08 Thread Holden Karau
It seems like the union function on RDDs might be what you are looking
for, or was there something else you were trying to achieve?

On Thursday, April 7, 2016, Tenghuan He  wrote:

> Hi all,
>
> I know that nested RDDs are not possible like linke rdd1.map(x => x +
> rdd2.count())
> I tried to create a custome RDD like following
>
> class MyRDD(base: RDD, part: Partitioner) extends RDD[(K, V)] {
>
> var rdds = new  ArrayBuffer.empty[RDD[(K, (V, Int))]]
> def update(rdd: RDD[_]) {
>   udds += rdd
> }
> def comput ...
> def getPartitions ...
> }
>
> In the compute method I call the internal rdds' iterators and got
> NullPointerException
> Is this also a form of nested RDDs and how do I get rid of this?
>
> Thanks.
>
>
> Tenghuan
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Why do I need to handle dependencies on EMR but not on-prem Hadoop?

2016-04-08 Thread YaoPau
On-prem I'm running PySpark on Cloudera's distribution, and I've never had to
worry about dependency issues.  I import my libraries on my driver node only
using pip or conda, run my jobs in yarn-client mode, and everything works (I
just assumed the relevant libraries are copied temporarily to each executor
node during execution).

But on EMR, I installed a library called fuzzywuzzy on the driver using pip,
then tried running this basic script in "pyspark --master yarn-client" mode:

>>>
mydata = sc.textFile("s3n://my_bucket/rum_20160331/*")
sample = mydata.take(3)
new_rdd = sc.parallelize(sample)
import random
import fuzzywuzzy

choices = ['hello', 'xylophone', 'zebra']
mapped_rdd = new_rdd.map(lambda row: str(fuzzywuzzy.process.extract(row,
choices, limit=2)))
mapped_rdd.collect()
>>>

and I'm getting the error:

ImportError: ('No module named fuzzywuzzy', , ('fuzzywuzzy',)) 

which makes me think I have to use py-files for the first time ever, and
resolve dependencies manually.

Why does this happen?  How is it that, on the on-prem Cloudera version,
Spark executor nodes are able to access all the libraries I've only
installed on my driver, but on EMR they can't?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-do-I-need-to-handle-dependencies-on-EMR-but-not-on-prem-Hadoop-tp26712.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib ALS MatrixFactorizationModel.save fails consistently

2016-04-08 Thread Nick Pentreath
Could you post some stack trace info?

Generally, it can be problematic to run Spark within a web server framework
as often there are dependency conflict and threading issues. You might
prefer to run the model-building as a standalone app, or check out
https://github.com/spark-jobserver/spark-jobserver (either for triggering
spark jobs remotely from a web app, via HTTP, or for ideas on how to handle
SparkContext within web framework / akka).

On Fri, 8 Apr 2016 at 00:56 Colin Woodbury  wrote:

> Hi all,
>
> I've implemented most of a content recommendation system for a client.
> However, whenever I attempt to save a MatrixFactorizationModel I've
> trained, I see one of four outcomes:
>
> 1. Despite "save" being wrapped in a "try" block, I see a massive stack
> trace quoting some java.io classes. The Model isn't written.
> 2. Same as the above, but the Model *is* written. It's unusable however,
> as it's missing many of the files it should have, particularly in the
> "product" folder.
> 3. Same as the above, but sbt crashes completely.
> 4. No massive stack trace, and the Model seems to be written. Upon being
> loaded by another Spark context and fed a user ID, it claims the user isn't
> present in the Model.
>
> Case 4 is pretty rare. I see these failures both locally and when I test
> on a Google Cloud instance with much better resources.
>
> Note that `ALS.trainImplicit` and `model.save` are being called from
> within a Future. Could it be possible that Play threads are closing before
> Spark can finish, thus interrupting it somehow?
>
> We are running Spark 1.6.1 within Play 2.4 and Scala 2.11. All these
> failures have occurred while in Play's Dev mode in SBT.
>
> Thanks for any insight you can give.
>


Re: [ERROR]: Spark 1.5.2 + Hbase 1.1 + Hive 1.2 + HbaseIntegration

2016-04-08 Thread Wojciech Indyk
Hello Divya!
Have you solved the problem?
I suppose the log comes from driver. You need to look also at logs on
worker JVMs, there can be an exception or something.
Do you have Kerberos on your cluster? It could be similar to a problem
http://issues.apache.org/jira/browse/SPARK-14115

Based on your logs:
> 16/02/29 23:09:34 INFO ClientCnxn: Opening socket connection to server
> localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL
> (unknown error)
> 16/02/29 23:09:34 INFO ClientCnxn: Socket connection established to
> localhost/0:0:0:0:0:0:0:1:2181, initiating session
> 16/02/29 23:09:34 INFO ClientCnxn: Session establishment complete on
> server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x3532fb70ba20035,

Maybe there is a problem with using RPC call to regions using IPv6
(but I just guess).

--
Kind regards/ Pozdrawiam,
Wojciech Indyk
http://datacentric.pl


2016-03-01 5:27 GMT+01:00 Divya Gehlot :
> Hi,
> I am getting error when I am trying to connect hive table (which is being
> created through HbaseIntegration) in spark
>
> Steps I followed :
> *Hive Table creation code  *:
> CREATE EXTERNAL TABLE IF NOT EXISTS TEST(NAME STRING,AGE INT)
> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,0:AGE")
> TBLPROPERTIES ("hbase.table.name" = "TEST",
> "hbase.mapred.output.outputtable" = "TEST");
>
>
> *DESCRIBE TEST ;*
> col_namedata_typecomment
> namestring from deserializer
> age   int from deserializer
>
>
> *Spark Code :*
> import org.apache.spark._
> import org.apache.spark.sql._
>
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.sql("from TEST SELECT  NAME").collect.foreach(println)
>
>
> *Starting Spark shell*
> spark-shell --jars
> /usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
> --driver-class-path
> /usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
> --packages com.databricks:spark-csv_2.10:1.3.0  --master yarn-client -i
> /TestDivya/Spark/InstrumentCopyToHDFSHive.scala
>
> *Stack Trace* :
>
> Stack SQL context available as sqlContext.
>> Loading /TestDivya/Spark/InstrumentCopyToHDFSHive.scala...
>> import org.apache.spark._
>> import org.apache.spark.sql._
>> 16/02/29 23:09:29 INFO HiveContext: Initializing execution hive, version
>> 1.2.1
>> 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
>> 2.7.1.2.3.4.0-3485
>> 16/02/29 23:09:29 INFO ClientWrapper: Loaded
>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
>> 2.7.1.2.3.4.0-3485
>> 16/02/29 23:09:29 INFO HiveContext: default warehouse location is
>> /user/hive/warehouse
>> 16/02/29 23:09:29 INFO HiveContext: Initializing HiveMetastoreConnection
>> version 1.2.1 using Spark classes.
>> 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
>> 2.7.1.2.3.4.0-3485
>> 16/02/29 23:09:29 INFO ClientWrapper: Loaded
>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
>> 2.7.1.2.3.4.0-3485
>> 16/02/29 23:09:30 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 16/02/29 23:09:30 INFO metastore: Trying to connect to metastore with URI
>> thrift://ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal:9083
>> 16/02/29 23:09:30 INFO metastore: Connected to metastore.
>> 16/02/29 23:09:30 WARN DomainSocketFactory: The short-circuit local reads
>> feature cannot be used because libhadoop cannot be loaded.
>> 16/02/29 23:09:31 INFO SessionState: Created local directory:
>> /tmp/1bf53785-f7c8-406d-a733-a5858ccb2d16_resources
>> 16/02/29 23:09:31 INFO SessionState: Created HDFS directory:
>> /tmp/hive/hdfs/1bf53785-f7c8-406d-a733-a5858ccb2d16
>> 16/02/29 23:09:31 INFO SessionState: Created local directory:
>> /tmp/hdfs/1bf53785-f7c8-406d-a733-a5858ccb2d16
>> 16/02/29 23:09:31 INFO SessionState: Created HDFS directory:
>> /tmp/hive/hdfs/1bf53785-f7c8-406d-a733-a5858ccb2d16/_tmp_space.db
>> 

how to use udf in spark thrift server.

2016-04-08 Thread zhanghn
I want to define some UDFs in my spark ENV.

And server it in thrift server. So I can use these UDFs in my beeline
connection. 

At first I tried start it with udf-jars and create functions in hive.

 

In spark-sql , I can add temp functions like "CREATE TEMPORARY FUNCTION
bsdUpper AS 'org.hue.udf.MyUpper';" , and it works well ,

but when I add functions like "CREATE FUNCTION bsdupperlong AS
'org.hue.udf.MyUpper' USING JAR 'hdfs://ns1/bsdtest2/myudfs.jar';" .

this command goes well , and I can see this function in metastore. 

But when I use this function "bsdupperlong" as I expected, it turns out ,
spark cannot find this function.

When I add this function again , it show an exception , means this function
already exist.

 

Then I found this unresolved ISSU :

https://issues.apache.org/jira/browse/SPARK-11609

So, .. It's a bug that has not been fixed.

 

Then I found this.

https://mail-archives.apache.org/mod_mbox/spark-user/201510.mbox/%3CCADONuiR
elyy7ebcj6qr3guo023upmvnt_d_gadrnbavhpor...@mail.gmail.com%3E

 

It tells me that I can Register in context , then start the Thrift Server
using startWithContext

from the spark shell.

But I failed on google to found an article to show me how start the Thrift
Server using startWithContext

from the spark shell is done.

 

Is anybody can help me with that ? 

 

Or find some other solution , that I can use UDFs in my beeline client. 

 

Thanks a lot.

 

Version of spark . spark-1.5.2

Hadoop 2.7.1 

Hive 1.2.1 

 

UDF code in jar is like this .

package org.hue.udf;

 

import org.apache.hadoop.hive.ql.exec.UDF;

import org.apache.hadoop.io.Text;

import java.util.UUID;

 

public final class MyUUID extends UDF {

  public String evaluate() {

  UUID uuid = UUID.randomUUID();

  return uuid.toString();

  }

}