Re: How to lookup by a key in an RDD

2015-11-01 Thread Gylfi
Hi. 

You may want to look into Indexed RDDs 
https://github.com/amplab/spark-indexedrdd

Regards, 
Gylfi. 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-lookup-by-a-key-in-an-RDD-tp25243p25247.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: If you use Spark 1.5 and disabled Tungsten mode ...

2015-11-01 Thread Reynold Xin
Thanks for reporting it, Sjoerd. You might have a different version of
Janino brought in from somewhere else.

This should fix your problem: https://github.com/apache/spark/pull/9372

Can you give it a try?



On Tue, Oct 27, 2015 at 9:12 PM, Sjoerd Mulder 
wrote:

> No the job actually doesn't fail, but since our tests is generating all
> these stacktraces i have disabled the tungsten mode just to be sure (and
> don't have gazilion stacktraces in production).
>
> 2015-10-27 20:59 GMT+01:00 Josh Rosen :
>
>> Hi Sjoerd,
>>
>> Did your job actually *fail* or did it just generate many spurious
>> exceptions? While the stacktrace that you posted does indicate a bug, I
>> don't think that it should have stopped query execution because Spark
>> should have fallen back to an interpreted code path (note the "Failed to
>> generate ordering, fallback to interpreted" in the error message).
>>
>> On Tue, Oct 27, 2015 at 12:56 PM Sjoerd Mulder 
>> wrote:
>>
>>> I have disabled it because of it started generating ERROR's when
>>> upgrading from Spark 1.4 to 1.5.1
>>>
>>> 2015-10-27T20:50:11.574+0100 ERROR TungstenSort.newOrdering() - Failed
>>> to generate ordering, fallback to interpreted
>>> java.util.concurrent.ExecutionException: java.lang.Exception: failed to
>>> compile: org.codehaus.commons.compiler.CompileException: Line 15, Column 9:
>>> Invalid character input "@" (character code 64)
>>>
>>> public SpecificOrdering
>>> generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
>>>   return new SpecificOrdering(expr);
>>> }
>>>
>>> class SpecificOrdering extends
>>> org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering {
>>>
>>>   private org.apache.spark.sql.catalyst.expressions.Expression[]
>>> expressions;
>>>
>>>
>>>
>>>   public
>>> SpecificOrdering(org.apache.spark.sql.catalyst.expressions.Expression[]
>>> expr) {
>>> expressions = expr;
>>>
>>>   }
>>>
>>>   @Override
>>>   public int compare(InternalRow a, InternalRow b) {
>>> InternalRow i = null;  // Holds current row being evaluated.
>>>
>>> i = a;
>>> boolean isNullA2;
>>> long primitiveA3;
>>> {
>>>   /* input[2, LongType] */
>>>
>>>   boolean isNull0 = i.isNullAt(2);
>>>   long primitive1 = isNull0 ? -1L : (i.getLong(2));
>>>
>>>   isNullA2 = isNull0;
>>>   primitiveA3 = primitive1;
>>> }
>>> i = b;
>>> boolean isNullB4;
>>> long primitiveB5;
>>> {
>>>   /* input[2, LongType] */
>>>
>>>   boolean isNull0 = i.isNullAt(2);
>>>   long primitive1 = isNull0 ? -1L : (i.getLong(2));
>>>
>>>   isNullB4 = isNull0;
>>>   primitiveB5 = primitive1;
>>> }
>>> if (isNullA2 && isNullB4) {
>>>   // Nothing
>>> } else if (isNullA2) {
>>>   return 1;
>>> } else if (isNullB4) {
>>>   return -1;
>>> } else {
>>>   int comp = (primitiveA3 > primitiveB5 ? 1 : primitiveA3 <
>>> primitiveB5 ? -1 : 0);
>>>   if (comp != 0) {
>>> return -comp;
>>>   }
>>> }
>>>
>>> return 0;
>>>   }
>>> }
>>>
>>> at
>>> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>>> at
>>> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>>> at
>>> org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>>> at
>>> org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>>> at
>>> org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>>> at
>>> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>>> at
>>> org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>>> at
>>> org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>> at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
>>> at
>>> org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>>> at
>>> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:362)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.create(GenerateOrdering.scala:139)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.create(GenerateOrdering.scala:37)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:425)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:422)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:294)
>>> at org.apache.spark.sql.execution.TungstenSort.org
>>> $apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:131)
>>> at
>>> 

Re: job hangs when using pipe() with reduceByKey()

2015-11-01 Thread Gylfi
Hi. 

What is slow exactly? 
In code-base 1: 
When you run the persist() + count() you stored the result in RAM. 
Then the map + reducebykey is done on in-memory data. 

In the latter case (all-in-oneline) you are doing both steps at the same
time.

So you are saying that if you sum-up the time to do both steps in the first
code-base it is still much faster than the latter code-base ? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242p25248.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark MLlib] about linear regression issue

2015-11-01 Thread Zhiliang Zhu
Dear All,
As for N dimension linear regression, while the labeled training points number 
(or the rank of the labeled point space) is less than N, then from perspective 
of math, the weight of the trained linear model may be not unique. 
However, the output of model.weight() by spark may be with some wi < 0. My 
issue is, is there some proper way only to getsome specific output weight with 
all wi >= 0 ...
Yes, the above goes same with the issue about solving linear system of 
equations, Aw = b, and r(A, b) = r(A) < columnNo(A), then w iswith infinite 
solutions, but here only needs one solution with all wi >= 0.When there is only 
unique solution, both LR and SVD will work perfect.

I will appreciate your all kind help very much~~Best Regards,Zhiliang



Re: Spark 1.5 on CDH 5.4.0

2015-11-01 Thread Deenar Toraskar
HI guys

I have documented the steps involved in getting Spark 1.5.1 run on CDH
5.4.0 here, let me know if it works for you as well
https://www.linkedin.com/pulse/running-spark-151-cdh-deenar-toraskar-cfa?trk=hp-feed-article-title-publish

Looking forward to CDH 5.5 which supports Spark 1.5.x out of the box.

Regards
Deenar




*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812


On 23 October 2015 at 17:31, Deenar Toraskar 
wrote:

> I got this working. For others trying this It turns out in Spark 1.3/CDH5.4
>
> spark.yarn.jar=local:/opt/cloudera/parcels/
>
> I had changed this to reflect the 1.5.1 version of spark assembly jar
>
> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>
> and this didn't work, I had to drop the "local:" prefix
>
> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>
> Regards
> Deenar
>
> On 23 October 2015 at 17:30, Deenar Toraskar <
> deenar.toras...@thinkreactive.co.uk> wrote:
>
>> I got this working. For others trying this It turns out in Spark
>> 1.3/CDH5.4
>>
>> spark.yarn.jar=local:/opt/cloudera/parcels/
>>
>> I had changed this to reflect the 1.5.1 version of spark assembly jar
>>
>> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>>
>> and this didn't work, I had to drop the "local:" prefix
>>
>> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>>
>> Regards
>> Deenar
>>
>>
>>
>>
>> *Think Reactive Ltd*
>> deenar.toras...@thinkreactive.co.uk
>> 07714140812
>>
>>
>>
>> On 23 October 2015 at 13:34, Deenar Toraskar 
>> wrote:
>>
>>> Sandy
>>>
>>> The assembly jar does contain org.apache.spark.deploy.yarn.ExecutorLauncher.
>>> I am trying to find out how i can increase the logging level, so I know the
>>> exact classpath used by Yarn ContainerLaunch.
>>>
>>> Deenar
>>>
>>> On 23 October 2015 at 03:30, Sandy Ryza  wrote:
>>>
 Hi Deenar,

 The version of Spark you have may not be compiled with YARN support.
 If you inspect the contents of the assembly jar, does
 org.apache.spark.deploy.yarn.ExecutorLauncher exist?  If not, you'll
 need to find a version that does have the YARN classes.  You can also build
 your own using the -Pyarn flag.

 -Sandy

 On Thu, Oct 22, 2015 at 9:04 AM, Deenar Toraskar <
 deenar.toras...@gmail.com> wrote:

> Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
> http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
> working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
> well including connecting to the Hive metastore. I am facing an issue
> running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
> start as java cannot find ExecutorLauncher. Error: Could not find or
> load main class org.apache.spark.deploy.yarn.ExecutorLauncher client
> token: N/Adiagnostics: Application application_1443531450011_13437
> failed 2 times due to AM Container for
> appattempt_1443531450011_13437_02 exited with exitCode: 1Stack
> trace: ExitCodeException exitCode=1:at
> org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
> org.apache.hadoop.util.Shell.run(Shell.java:455)at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
> java.util.concurrent.FutureTask.run(FutureTask.java:262)at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
> java.lang.Thread.run(Thread.java:745) Any ideas as to what might be
> going wrong. Also how can I turn on more detailed logging to see what
> command line is being run by Yarn to launch containers? RegardsDeenar
>


>>>
>>
>


Some spark apps fail with "All masters are unresponsive", while others pass normally

2015-11-01 Thread Romi Kuntsman
[adding dev list since it's probably a bug, but i'm not sure how to
reproduce so I can open a bug about it]

Hi,

I have a standalone Spark 1.4.0 cluster with 100s of applications running
every day.

>From time to time, the applications crash with the following error (see
below)
But at the same time (and also after that), other applications are running,
so I can safely assume the master and workers are working.

1. why is there a NullPointerException? (i can't track the scala stack
trace to the code, but anyway NPE is usually a obvious bug even if there's
actually a network error...)
2. why can't it connect to the master? (if it's a network timeout, how to
increase it? i see the values are hardcoded inside AppClient)
3. how to recover from this error?


  ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application has
been killed. Reason: All masters are unresponsive! Giving up. ERROR
  ERROR 01-11 15:32:55,087  OneForOneStrategy - ERROR
logs/error.log
  java.lang.NullPointerException NullPointerException
  at
org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
  at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at
org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  ERROR 01-11 15:32:55,603   SparkContext - Error
initializing SparkContext. ERROR
  java.lang.IllegalStateException: Cannot call methods on a stopped
SparkContext
  at org.apache.spark.SparkContext.org
$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
  at
org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
  at
org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
  at org.apache.spark.SparkContext.(SparkContext.scala:543)
  at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)


Thanks!

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com


Re: apply simplex method to fix linear programming in spark

2015-11-01 Thread Zhiliang Zhu
Hi Ted Yu,
Thanks very much for your kind reply.Do you just mean that in spark there is no 
specific package for simplex method?
Then I may try to fix it by myself, do not decide whether it is convenient to 
finish by spark, before finally fix it.
Thank you,Zhiliang
 


 On Monday, November 2, 2015 1:43 AM, Ted Yu  wrote:
   

 A brief search in code base shows the following:
    TODO: Add simplex constraints to allow alpha in 
(0,1)../mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
I guess the answer to your question is no.
FYI
On Sun, Nov 1, 2015 at 9:37 AM, Zhiliang Zhu  
wrote:

Dear All,
As I am facing some typical linear programming issue, and I know simplex method 
is specific in solving LP question, I am very sorry that whether there is 
already some mature package in spark about simplex method...
Thank you very much~Best Wishes!Zhiliang





  

apply simplex method to fix linear programming in spark

2015-11-01 Thread Zhiliang Zhu
Dear All,
As I am facing some typical linear programming issue, and I know simplex method 
is specific in solving LP question, I am very sorry that whether there is 
already some mature package in spark about simplex method...
Thank you very much~Best Wishes!Zhiliang



Re: apply simplex method to fix linear programming in spark

2015-11-01 Thread Ted Yu
A brief search in code base shows the following:

TODO: Add simplex constraints to allow alpha in (0,1).
./mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala

I guess the answer to your question is no.

FYI

On Sun, Nov 1, 2015 at 9:37 AM, Zhiliang Zhu 
wrote:

> Dear All,
>
> As I am facing some typical linear programming issue, and I know simplex
> method is specific in solving LP question,
> I am very sorry that whether there is already some mature package in spark
> about simplex method...
>
> Thank you very much~
> Best Wishes!
> Zhiliang
>
>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Koert Kuipers
good idea. with the dates sorting correctly alphabetically i should be able
to do something similar with strings

On Sun, Nov 1, 2015 at 4:06 PM, Jörn Franke  wrote:

> Try with max date, in your case it could make more sense to represent the
> date as int
>
> Sent from my iPhone
>
> On 01 Nov 2015, at 21:03, Koert Kuipers  wrote:
>
> hello all,
> i am trying to get familiar with spark sql partitioning support.
>
> my data is partitioned by date, so like this:
> data/date=2015-01-01
> data/date=2015-01-02
> data/date=2015-01-03
> ...
>
> lets say i would like a batch process to read data for the latest date
> only. how do i proceed?
> generally the latest date will be yesterday, but it could be a day older
> or maybe 2.
>
> i understand that i will have to do something like:
> df.filter(df("date") === some_date_string_here)
>
> however i do now know what some_date_string_here should be. i would like
> to inspect the available dates and pick the latest. is there an efficient
> way to  find out what the available partitions are?
>
> thanks! koert
>
>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Koert Kuipers
it seems pretty fast, but if i have 2 partitions and 10mm records i do have
to dedupe (distinct) 10mm records

a direct way to just find out what the 2 partitions are would be much
faster. spark knows it, but its not exposed.

On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers  wrote:

> it seems to work but i am not sure if its not scanning the whole dataset.
> let me dig into tasks a a bit
>
> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam  wrote:
>
>> Hi Koert,
>>
>> If the partitioned table is implemented properly, I would think "select
>> distinct(date) as dt from table order by dt DESC limit 1" would return the
>> latest dates without scanning the whole dataset. I haven't try it that
>> myself. It would be great if you can report back if this actually works or
>> not. :)
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers  wrote:
>>
>>> hello all,
>>> i am trying to get familiar with spark sql partitioning support.
>>>
>>> my data is partitioned by date, so like this:
>>> data/date=2015-01-01
>>> data/date=2015-01-02
>>> data/date=2015-01-03
>>> ...
>>>
>>> lets say i would like a batch process to read data for the latest date
>>> only. how do i proceed?
>>> generally the latest date will be yesterday, but it could be a day older
>>> or maybe 2.
>>>
>>> i understand that i will have to do something like:
>>> df.filter(df("date") === some_date_string_here)
>>>
>>> however i do now know what some_date_string_here should be. i would like
>>> to inspect the available dates and pick the latest. is there an efficient
>>> way to  find out what the available partitions are?
>>>
>>> thanks! koert
>>>
>>>
>>>
>>
>


Re: issue with spark.driver.maxResultSize parameter in spark 1.3

2015-11-01 Thread karthik kadiyam
Did any one had issue setting spark.driver.maxResultSize value ?

On Friday, October 30, 2015, karthik kadiyam 
wrote:

> Hi Shahid,
>
> I played around with spark driver memory too. In the conf file it was set
> to " --driver-memory 20G " first. When i changed the spark driver
> maxResultSize from default to 2g ,i changed the driver memory to 30G and
> tired too. It gave we same error says "bigger than  (1024.0 MB) " .
> spark.driver.maxResultSize
> One other thing i observed is , in one of the tasks the data its trying to
> process is more than 100 MB and that exceutor and task keeps losing
> connection and doing retry. I tried increase the Tasks by repartition from
> 120 to 240 to 480 also. Still i can see in one of my tasks it still is
> trying to process more than 100 mb. Other task hardly process 1 mb to 10 mb
> , some around 20 mbs, some have 0 mbs .
>
> Any idea how can i try to even the data distribution acrosss multiple
> node.
>
> On Fri, Oct 30, 2015 at 12:09 AM, shahid ashraf  > wrote:
>
>> Hi
>> I guess you need to increase spark driver memory as well. But that should
>> be set in conf files
>> Let me know if that resolves
>> On Oct 30, 2015 7:33 AM, "karthik kadiyam" > > wrote:
>>
>>> Hi,
>>>
>>> In spark streaming job i had the following setting
>>>
>>> this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
>>> and i got the error in the job as below
>>>
>>> User class threw exception: Job aborted due to stage failure: Total size
>>> of serialized results of 120 tasks (1082.2 MB) is bigger than
>>> spark.driver.maxResultSize (1024.0 MB)
>>>
>>> Basically i realized that as default value is 1 GB. I changed
>>> the configuration as below.
>>>
>>> this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);
>>>
>>> and when i ran the job it gave the error
>>>
>>> User class threw exception: Job aborted due to stage failure: Total size
>>> of serialized results of 120 tasks (1082.2 MB) is bigger than
>>> spark.driver.maxResultSize (1024.0 MB)
>>>
>>> So, basically the change i made is not been considered in the job. so my
>>> question is
>>>
>>> - "spark.driver.maxResultSize", “2g” is this the right way to change or
>>> any other way to do it.
>>> - Is this a bug in spark 1.3 or something or any one had this issue
>>> before?
>>>
>>>
>


Sort Merge Join

2015-11-01 Thread Alex Nastetsky
Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
example, in the code below, the two datasets have different number of
partitions, but it still does a SortMerge join after a "hashpartitioning".


CODE:
   val sparkConf = new SparkConf()
  .setAppName("SortMergeJoinTest")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.eventLog.enabled", "true")
  .set("spark.sql.planner.sortMergeJoin","true")

sparkConf.setMaster("local-cluster[3,1,1024]")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val inputpath = input.gz.parquet

val df1 = sqlContext.read.parquet(inputpath).repartition(3)
val df2 = sqlContext.read.parquet(inputpath).repartition(5)
val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
result.explain()

OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#8]
TungstenSort [foo#0 ASC], false, 0
  TungstenExchange hashpartitioning(foo#0)
  ConvertToUnsafe
Repartition 3, true
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#8 ASC], false, 0
  TungstenExchange hashpartitioning(foo2#8)
  TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
Repartition 5, true
Scan
ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]

2) If both datasets have already been previously partitioned/sorted the
same and stored on the file system (e.g. in a previous job), is there a way
to tell Spark this so that it won't want to do a "hashpartitioning" on
them? It looks like Spark just considers datasets that have been just read
from the the file system to have UnknownPartitioning. In the example below,
I try to join a dataframe to itself, and it still wants to hash repartition.

CODE:
...
val df1 = sqlContext.read.parquet(inputpath)
val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
result.explain()

OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#4]
TungstenSort [foo#0 ASC], false, 0
  TungstenExchange hashpartitioning(foo#0)
  ConvertToUnsafe
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#4 ASC], false, 0
  TungstenExchange hashpartitioning(foo2#4)
  ConvertToUnsafe
Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8]
Scan
ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8]


Thanks.


Re: job hangs when using pipe() with reduceByKey()

2015-11-01 Thread hotdog
yes, the first code takes only 30mins.
 but the second method, I wait for 5 hours, only finish 10%



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242p25249.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: [Spark MLlib] about linear regression issue

2015-11-01 Thread DB Tsai
For the constrains like all weights >=0, people do LBFGS-B which is
supported in our optimization library, Breeze.
https://github.com/scalanlp/breeze/issues/323

However, in Spark's LiR, our implementation doesn't have constrain
implementation. I do see this is useful given we're experimenting
SLIM: Sparse Linear Methods for recommendation,
http://www-users.cs.umn.edu/~xning/papers/Ning2011c.pdf which requires
all the weights to be positive (Eq. 3) to represent positive relations
between items.

In summary, it's possible and not difficult to add this constrain to
our current linear regression, but currently, there is no open source
implementation in Spark.

Sincerely,

DB Tsai
--
Web: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Sun, Nov 1, 2015 at 9:22 AM, Zhiliang Zhu  wrote:
> Dear All,
>
> As for N dimension linear regression, while the labeled training points
> number (or the rank of the labeled point space) is less than N,
> then from perspective of math, the weight of the trained linear model may be
> not unique.
>
> However, the output of model.weight() by spark may be with some wi < 0. My
> issue is, is there some proper way only to get
> some specific output weight with all wi >= 0 ...
>
> Yes, the above goes same with the issue about solving linear system of
> equations, Aw = b, and r(A, b) = r(A) < columnNo(A), then w is
> with infinite solutions, but here only needs one solution with all wi >= 0.
> When there is only unique solution, both LR and SVD will work perfect.
>
> I will appreciate your all kind help very much~~
> Best Regards,
> Zhiliang
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
I agreed the max date will satisfy the latest date requirement but it does
not satisfy the second last date requirement you mentioned.

Just for your information, before you invested in the partitioned table too
much, I want to warn you that it has memory issues (both on executors and
driver side). A simple experiment can show that if you have over 10 years
of date (3650 directories), it takes a long time to initialize. I got to
know the limitation after I tried to partition user events per their
user_id. It was a disaster (>1 user_id).

I hope the spark developer can address the memory limitations because
partitioned table is very useful in many cases.

Cheers ~



On Sun, Nov 1, 2015 at 4:39 PM, Koert Kuipers  wrote:

> i was going for the distinct approach, since i want it to be general
> enough to also solve other related problems later. the max-date is likely
> to be faster though.
>
> On Sun, Nov 1, 2015 at 4:36 PM, Jerry Lam  wrote:
>
>> Hi Koert,
>>
>> You should be able to see if it requires scanning the whole data by
>> "explain" the query. The physical plan should say something about it. I
>> wonder if you are trying the distinct-sort-by-limit approach or the
>> max-date approach?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers  wrote:
>>
>>> it seems pretty fast, but if i have 2 partitions and 10mm records i do
>>> have to dedupe (distinct) 10mm records
>>>
>>> a direct way to just find out what the 2 partitions are would be much
>>> faster. spark knows it, but its not exposed.
>>>
>>> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers  wrote:
>>>
 it seems to work but i am not sure if its not scanning the whole
 dataset. let me dig into tasks a a bit

 On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam  wrote:

> Hi Koert,
>
> If the partitioned table is implemented properly, I would think
> "select distinct(date) as dt from table order by dt DESC limit 1" would
> return the latest dates without scanning the whole dataset. I haven't try
> it that myself. It would be great if you can report back if this actually
> works or not. :)
>
> Best Regards,
>
> Jerry
>
>
> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers 
> wrote:
>
>> hello all,
>> i am trying to get familiar with spark sql partitioning support.
>>
>> my data is partitioned by date, so like this:
>> data/date=2015-01-01
>> data/date=2015-01-02
>> data/date=2015-01-03
>> ...
>>
>> lets say i would like a batch process to read data for the latest
>> date only. how do i proceed?
>> generally the latest date will be yesterday, but it could be a day
>> older or maybe 2.
>>
>> i understand that i will have to do something like:
>> df.filter(df("date") === some_date_string_here)
>>
>> however i do now know what some_date_string_here should be. i would
>> like to inspect the available dates and pick the latest. is there an
>> efficient way to  find out what the available partitions are?
>>
>> thanks! koert
>>
>>
>>
>

>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

You should be able to see if it requires scanning the whole data by
"explain" the query. The physical plan should say something about it. I
wonder if you are trying the distinct-sort-by-limit approach or the
max-date approach?

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers  wrote:

> it seems pretty fast, but if i have 2 partitions and 10mm records i do
> have to dedupe (distinct) 10mm records
>
> a direct way to just find out what the 2 partitions are would be much
> faster. spark knows it, but its not exposed.
>
> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers  wrote:
>
>> it seems to work but i am not sure if its not scanning the whole dataset.
>> let me dig into tasks a a bit
>>
>> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam  wrote:
>>
>>> Hi Koert,
>>>
>>> If the partitioned table is implemented properly, I would think "select
>>> distinct(date) as dt from table order by dt DESC limit 1" would return the
>>> latest dates without scanning the whole dataset. I haven't try it that
>>> myself. It would be great if you can report back if this actually works or
>>> not. :)
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers  wrote:
>>>
 hello all,
 i am trying to get familiar with spark sql partitioning support.

 my data is partitioned by date, so like this:
 data/date=2015-01-01
 data/date=2015-01-02
 data/date=2015-01-03
 ...

 lets say i would like a batch process to read data for the latest date
 only. how do i proceed?
 generally the latest date will be yesterday, but it could be a day
 older or maybe 2.

 i understand that i will have to do something like:
 df.filter(df("date") === some_date_string_here)

 however i do now know what some_date_string_here should be. i would
 like to inspect the available dates and pick the latest. is there an
 efficient way to  find out what the available partitions are?

 thanks! koert



>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Koert Kuipers
i was going for the distinct approach, since i want it to be general enough
to also solve other related problems later. the max-date is likely to be
faster though.

On Sun, Nov 1, 2015 at 4:36 PM, Jerry Lam  wrote:

> Hi Koert,
>
> You should be able to see if it requires scanning the whole data by
> "explain" the query. The physical plan should say something about it. I
> wonder if you are trying the distinct-sort-by-limit approach or the
> max-date approach?
>
> Best Regards,
>
> Jerry
>
>
> On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers  wrote:
>
>> it seems pretty fast, but if i have 2 partitions and 10mm records i do
>> have to dedupe (distinct) 10mm records
>>
>> a direct way to just find out what the 2 partitions are would be much
>> faster. spark knows it, but its not exposed.
>>
>> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers  wrote:
>>
>>> it seems to work but i am not sure if its not scanning the whole
>>> dataset. let me dig into tasks a a bit
>>>
>>> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam  wrote:
>>>
 Hi Koert,

 If the partitioned table is implemented properly, I would think "select
 distinct(date) as dt from table order by dt DESC limit 1" would return the
 latest dates without scanning the whole dataset. I haven't try it that
 myself. It would be great if you can report back if this actually works or
 not. :)

 Best Regards,

 Jerry


 On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers 
 wrote:

> hello all,
> i am trying to get familiar with spark sql partitioning support.
>
> my data is partitioned by date, so like this:
> data/date=2015-01-01
> data/date=2015-01-02
> data/date=2015-01-03
> ...
>
> lets say i would like a batch process to read data for the latest date
> only. how do i proceed?
> generally the latest date will be yesterday, but it could be a day
> older or maybe 2.
>
> i understand that i will have to do something like:
> df.filter(df("date") === some_date_string_here)
>
> however i do now know what some_date_string_here should be. i would
> like to inspect the available dates and pick the latest. is there an
> efficient way to  find out what the available partitions are?
>
> thanks! koert
>
>
>

>>>
>>
>


Occasionally getting RpcTimeoutException

2015-11-01 Thread Jake Yoon
Hi Sparkers.

I am very new to Spark, and I am occasionally getting RpCTimeoutException
with the following error.

15/11/01 22:19:46 WARN HeartbeatReceiver: Removing executor 0 with no
> recent heartbeats: 321792 ms exceeds timeout 30 ms
> 15/11/01 22:19:46 ERROR TaskSchedulerImpl: Lost executor 0 on 172.31.11.1:
> Executor heartbeat timed out after 321792 ms
> 15/11/01 22:19:46 WARN TaskSetManager: Lost task 0.0 in stage 755.0 (TID
> 755, 172.31.11.1): ExecutorLostFailure (executor 0 lost)
> 15/11/01 22:20:18 ERROR ContextCleaner: Error cleaning RDD 1775
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [300
> seconds]. This timeout is controlled by spark.network.timeout
> ...
> ...
> ...
> 15/11/01 22:20:18 WARN BlockManagerMaster: Failed to remove RDD 1775 - Ask
> timed out on [Actor[akka.tcp://
> sparkExecutor@172.31.11.1:34987/user/BlockManagerEndpoint1#-787212020]]
> after [30 ms]. This timeout is controlled by spark.network.timeout
> org.apache.spark.rpc.RpcTimeoutException: Ask timed out on
> [Actor[akka.tcp://
> sparkExecutor@172.31.11.1:34987/user/BlockManagerEndpoint1#-787212020]]
> after [30 ms]. This timeout is controlled by spark.network.timeout


And the following is my code:

val sessionsRDD = sessions.mapPartitions { valueIterator =>
> val conf = new SparkConf()
>   .set("com.couchbase.nodes",
> confBd.value.get("com.couchbase.nodes").get)
>   .set("com.couchbase.bucket.default",
> confBd.value.get("com.couchbase.bucket.default").get)
> val cbConf = CouchbaseConfig(conf)
> val bucket = CouchbaseConnection().bucket(cbConf,
> "default").async()
> if (valueIterator.isEmpty) {
>   Iterator[JsonDocument]()
> } else LazyIterator {
>   Observable
> .from(OnceIterable(valueIterator).toSeq)
> .flatMap(id => {
>
> Observable.defer[JsonDocument](toScalaObservable(bucket.get(id,
> classOf[JsonDocument])))
>
> .retryWhen(RetryBuilder.anyOf(classOf[BackpressureException])
>   .max(5)
>   .delay(Delay.exponential(TimeUnit.MILLISECONDS, 500,
> 1)).build())
> })
> .toBlocking
> .toIterable
> .iterator
> }
>   }
>   sessionsRDD.cache()
>   val sessionInfo = sessionsRDD
>   .map(doc => {
>   (0, 0, 0)
> })
>   .count()
>   println(sessionInfo)
>   sessionsRDD.unpersist()



And this part gives me the error:

  val sessionInfo = sessionsRDD
>   .map(doc => {
>   (0, 0, 0)
> })
>   .count()
>   println(sessionInfo)


I tried increasing timeout, but it does not quite helping me.
How could I solve such issue?
Any hint would be helpful.

Jake


spark sql partitioned by date... read last date

2015-11-01 Thread Koert Kuipers
hello all,
i am trying to get familiar with spark sql partitioning support.

my data is partitioned by date, so like this:
data/date=2015-01-01
data/date=2015-01-02
data/date=2015-01-03
...

lets say i would like a batch process to read data for the latest date
only. how do i proceed?
generally the latest date will be yesterday, but it could be a day older or
maybe 2.

i understand that i will have to do something like:
df.filter(df("date") === some_date_string_here)

however i do now know what some_date_string_here should be. i would like to
inspect the available dates and pick the latest. is there an efficient way
to  find out what the available partitions are?

thanks! koert


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jörn Franke
Try with max date, in your case it could make more sense to represent the date 
as int 

Sent from my iPhone

> On 01 Nov 2015, at 21:03, Koert Kuipers  wrote:
> 
> hello all,
> i am trying to get familiar with spark sql partitioning support.
> 
> my data is partitioned by date, so like this:
> data/date=2015-01-01
> data/date=2015-01-02
> data/date=2015-01-03
> ...
> 
> lets say i would like a batch process to read data for the latest date only. 
> how do i proceed? 
> generally the latest date will be yesterday, but it could be a day older or 
> maybe 2. 
> 
> i understand that i will have to do something like:
> df.filter(df("date") === some_date_string_here)
> 
> however i do now know what some_date_string_here should be. i would like to 
> inspect the available dates and pick the latest. is there an efficient way to 
>  find out what the available partitions are?
> 
> thanks! koert
> 
> 


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

the physical plan looks like it is doing the right thing:

partitioned table hdfs://user/koert/test, read date from the directory
names, hash partitioned and agg the date to find distinct date. Finally
shuffle the dates for sort and limit 1 operations.

This is my understanding of the physical plan, you can navigate the actual
execution in the web UI to see how much data is actually read to satisfy
this request. I hope it only requires a few bytes for few dates.

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 5:56 PM, Jerry Lam  wrote:

> I agreed the max date will satisfy the latest date requirement but it does
> not satisfy the second last date requirement you mentioned.
>
> Just for your information, before you invested in the partitioned table
> too much, I want to warn you that it has memory issues (both on executors
> and driver side). A simple experiment can show that if you have over 10
> years of date (3650 directories), it takes a long time to initialize. I got
> to know the limitation after I tried to partition user events per their
> user_id. It was a disaster (>1 user_id).
>
> I hope the spark developer can address the memory limitations because
> partitioned table is very useful in many cases.
>
> Cheers ~
>
>
>
> On Sun, Nov 1, 2015 at 4:39 PM, Koert Kuipers  wrote:
>
>> i was going for the distinct approach, since i want it to be general
>> enough to also solve other related problems later. the max-date is likely
>> to be faster though.
>>
>> On Sun, Nov 1, 2015 at 4:36 PM, Jerry Lam  wrote:
>>
>>> Hi Koert,
>>>
>>> You should be able to see if it requires scanning the whole data by
>>> "explain" the query. The physical plan should say something about it. I
>>> wonder if you are trying the distinct-sort-by-limit approach or the
>>> max-date approach?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers  wrote:
>>>
 it seems pretty fast, but if i have 2 partitions and 10mm records i do
 have to dedupe (distinct) 10mm records

 a direct way to just find out what the 2 partitions are would be much
 faster. spark knows it, but its not exposed.

 On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers 
 wrote:

> it seems to work but i am not sure if its not scanning the whole
> dataset. let me dig into tasks a a bit
>
> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam 
> wrote:
>
>> Hi Koert,
>>
>> If the partitioned table is implemented properly, I would think
>> "select distinct(date) as dt from table order by dt DESC limit 1" would
>> return the latest dates without scanning the whole dataset. I haven't try
>> it that myself. It would be great if you can report back if this actually
>> works or not. :)
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers 
>> wrote:
>>
>>> hello all,
>>> i am trying to get familiar with spark sql partitioning support.
>>>
>>> my data is partitioned by date, so like this:
>>> data/date=2015-01-01
>>> data/date=2015-01-02
>>> data/date=2015-01-03
>>> ...
>>>
>>> lets say i would like a batch process to read data for the latest
>>> date only. how do i proceed?
>>> generally the latest date will be yesterday, but it could be a day
>>> older or maybe 2.
>>>
>>> i understand that i will have to do something like:
>>> df.filter(df("date") === some_date_string_here)
>>>
>>> however i do now know what some_date_string_here should be. i would
>>> like to inspect the available dates and pick the latest. is there an
>>> efficient way to  find out what the available partitions are?
>>>
>>> thanks! koert
>>>
>>>
>>>
>>
>

>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

If the partitioned table is implemented properly, I would think "select
distinct(date) as dt from table order by dt DESC limit 1" would return the
latest dates without scanning the whole dataset. I haven't try it that
myself. It would be great if you can report back if this actually works or
not. :)

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers  wrote:

> hello all,
> i am trying to get familiar with spark sql partitioning support.
>
> my data is partitioned by date, so like this:
> data/date=2015-01-01
> data/date=2015-01-02
> data/date=2015-01-03
> ...
>
> lets say i would like a batch process to read data for the latest date
> only. how do i proceed?
> generally the latest date will be yesterday, but it could be a day older
> or maybe 2.
>
> i understand that i will have to do something like:
> df.filter(df("date") === some_date_string_here)
>
> however i do now know what some_date_string_here should be. i would like
> to inspect the available dates and pick the latest. is there an efficient
> way to  find out what the available partitions are?
>
> thanks! koert
>
>
>


Re: streaming.twitter.TwitterUtils what is the best way to save twitter status to HDFS?

2015-11-01 Thread Akhil Das
You can use the .saveAsObjectFiles("hdfs://sigmoid/twitter/status/") since
you want to store the Status object and for every batch it will create a
directory under /status (name will mostly be the timestamp), since the data
is small (hardly couple of MBs for 1 sec interval) it will not overwhelm
the cluster.

Thanks
Best Regards

On Sat, Oct 24, 2015 at 7:05 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I need to save the twitter status I receive so that I can do additional
> batch based processing on them in the future. Is it safe to assume HDFS is
> the best way to go?
>
> Any idea what is the best way to save twitter status to HDFS?
>
> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
> Duration(1000));
>
> Authorization twitterAuth = setupTwitterAuthorization();
>
> JavaDStream tweets = TwitterFilterQueryUtils.createStream(
> ssc, twitterAuth, query);
>
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>
>
>
> *saveAsHadoopFiles*(*prefix*, [*suffix*])Save this DStream's contents as
> Hadoop files. The file name at each batch interval is generated based on
> *prefix* and *suffix*: *"prefix-TIME_IN_MS[.suffix]"*.
> Python API This is not available in the Python API.
>
> How ever JavaDStream<> does not support any savesAs* functions
>
>
> DStream dStream = tweets.dstream();
>
>
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/dstream/DStream.html
>
> DStream only supports *saveAsObjectFiles
> ()and
>  **saveAsTextFiles
> *
> (()
>
>
> saveAsTextFiles
>
> public void saveAsTextFiles(java.lang.String prefix,
>java.lang.String suffix)
>
> Save each RDD in this DStream as at text file, using string representation
> of elements. The file name at each batch interval is generated based on
> prefix andsuffix: "prefix-TIME_IN_MS.suffix”.
>
>
> Any idea where I would find these files? I assume they will be spread out
> all over my cluster?
>
>
> Also I wonder if using the saveAs*() functions are going to cause other
> problems. My duration is set to 1 sec. Am I going to overwhelm the system
> with a bunch of tiny files? Many of them will be empty
>
>
> Kind regards
>
>
> Andy
>


RE: Sort Merge Join

2015-11-01 Thread Cheng, Hao
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

[Hao:] A distributed JOIN operation (either HashBased or SortBased Join) 
requires the records with the identical join keys MUST BE shuffled to the same 
“reducer” node / task, hashpartitioning is just a strategy to tell spark 
shuffle service how to achieve that, in theory, we even can use the 
`RangePartitioning` instead (but it’s less efficient, that’s why we don’t 
choose it for JOIN). So conceptually the JOIN operator doesn’t care so much 
about the shuffle strategy so much if it satisfies the demand on data 
distribution.

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file 
system to have UnknownPartitioning. In the example below, I try to join a 
dataframe to itself, and it still wants to hash repartition.

[Hao:] Take this as example:

EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON a.key=b.key 
JOIN src c ON b.key=c.key

== Physical Plan ==
TungstenProject [value#20,value#22,value#24]
SortMergeJoin [key#21], [key#23]
  TungstenSort [key#21 ASC], false, 0
   TungstenProject [key#21,value#22,value#20]
SortMergeJoin [key#19], [key#21]
 TungstenSort [key#19 ASC], false, 0
  TungstenExchange hashpartitioning(key#19,200)
   ConvertToUnsafe
HiveTableScan [key#19,value#20], (MetastoreRelation default, src, 
Some(a))
 TungstenSort [key#21 ASC], false, 0
  TungstenExchange hashpartitioning(key#21,200)
   ConvertToUnsafe
HiveTableScan [key#21,value#22], (MetastoreRelation default, src, 
Some(b))
  TungstenSort [key#23 ASC], false, 0
   TungstenExchange hashpartitioning(key#23,200)
ConvertToUnsafe
 HiveTableScan [key#23,value#24], (MetastoreRelation default, src, Some(c))

There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN src b 
ON a.key=b.key”, as we didn’t change the data distribution after it, so we can 
join another table “JOIN src c ON b.key=c.key” directly, which only require the 
table “c” for repartitioning on “key”.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Hao

From: Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
Sent: Monday, November 2, 2015 11:29 AM
To: user
Subject: Sort Merge Join

Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

CODE:
   val sparkConf = new SparkConf()
  .setAppName("SortMergeJoinTest")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.eventLog.enabled", "true")
  .set("spark.sql.planner.sortMergeJoin","true")

sparkConf.setMaster("local-cluster[3,1,1024]")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val inputpath = input.gz.parquet

val df1 = sqlContext.read.parquet(inputpath).repartition(3)
val df2 = sqlContext.read.parquet(inputpath).repartition(5)
val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" === 
$"foo2")
result.explain()

OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#8]
TungstenSort [foo#0 ASC], false, 0
  TungstenExchange hashpartitioning(foo#0)
  ConvertToUnsafe
Repartition 3, true
Scan 
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#8 ASC], false, 0
  TungstenExchange hashpartitioning(foo2#8)
  TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
Repartition 5, true
Scan 
ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file 

RE: sparkR 1.5.1 batch yarn-client mode failing on daemon.R not found

2015-11-01 Thread Sun, Rui
Tom,

Have you set the “MASTER” evn variable on your machine? What is the value if 
set?

From: Tom Stewart [mailto:stewartthom...@yahoo.com.INVALID]
Sent: Friday, October 30, 2015 10:11 PM
To: user@spark.apache.org
Subject: sparkR 1.5.1 batch yarn-client mode failing on daemon.R not found

I have the following script in a file named test.R:

library(SparkR)
sc <- sparkR.init(master="yarn-client")
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, faithful)
showDF(df)
sparkR.stop()
q(save="no")

If I submit this with "sparkR test.R" or "R  CMD BATCH test.R" or "Rscript 
test.R" it fails with this error:
15/10/29 08:08:49 INFO r.BufferedStreamThread: Fatal error: cannot open file 
'/mnt/hdfs9/yarn/nm-local-dir/usercache/hadoop/appcache/application_1446058618330_0171/container_e805_1446058618330_0171_01_05/sparkr/SparkR/worker/daemon.R':
 No such file or directory
15/10/29 08:08:59 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 
(TID 1)
java.net.SocketTimeoutException: Accept timed out


However, if I launch just an interactive sparkR shell and cut/paste those 
commands - it runs fine.
It also runs fine on the same Hadoop cluster with Spark 1.4.1.
And, it runs fine from batch mode if I just use sparkR.init() and not 
sparkR.init(master="yarn-client")


Re: Unable to use saveAsSequenceFile

2015-11-01 Thread Akhil Das
Make sure your firewall isn't blocking the requests.

Thanks
Best Regards

On Sat, Oct 24, 2015 at 5:04 PM, Amit Singh Hora 
wrote:

> Hi All,
>
> I am trying to wrote an RDD as Sequence file into my Hadoop cluster but
> getting connection time out again and again ,I can ping the hadoop cluster
> and also directory gets created with the file name i specify ,I believe I
> am
> missing some configuration ,Kindly help me
>
> object WriteSequenceFileDemo {
>   def main(args: Array[String]): Unit = {
> val sparkConf = new
> SparkConf().setAppName("sequenceFile").setMaster("local[2]")
>
> val sparkContext=new SparkContext(sparkConf)
> val data=Array((1,"a"),(2,"b"),(3,"c"))
>
> val dataRDD=sparkContext.parallelize(data,2)
> val hadoopRDD=dataRDD.map(x=>{
>   (new IntWritable(x._1),new Text(x._2))
> })
> //
> hadoopRDD.saveAsSequenceFile("hdfs://ambarimaster/tmp/Demosequencefile")
>
>
> SparkContext.rddToSequenceFileRDDFunctions(hadoopRDD.asInstanceOf[RDD[(IntWritable,
> Text)]])
> .saveAsSequenceFile("hdfs://ambarimaster/tmp/Demosequencefile")
>   }
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-use-saveAsSequenceFile-tp25190.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Error : - No filesystem for scheme: spark

2015-11-01 Thread Jean-Baptiste Onofré

Hi,

do you have something special in conf/spark-defaults.conf (especially on 
the eventLog directory) ?


Regards
JB

On 11/02/2015 07:48 AM, Balachandar R.A. wrote:

Can someone tell me at what point this error could come?

In one of my use cases, I am trying to use hadoop custom input format.
Here is my code.

|valhConf:Configuration=sc.hadoopConfiguration
hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
=newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
=newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
=hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|

|The moment I invoke mapPartitionsWithInputSplit() method, I get the
below error in my spark-submit launch|

|
|

|15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage
0.0(TID 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|

Any help here to move towards fixing this will be of great help



Thanks

Bala



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java how to configure streaming.dstream.DStream<> saveAsTextFiles() to work with hdfs?

2015-11-01 Thread Akhil Das
How are you submitting your job? You need to make sure HADOOP_CONF_DIR is
pointing to your hadoop configuration directory (with core-site.xml,
hdfs-site.xml files), If you have them set properly then make sure you are
giving the full hdfs url like:

dStream.saveAsTextFiles("hdfs://sigmoid-cluster:9000/twitter/","status-")


Thanks
Best Regards

On Sun, Oct 25, 2015 at 1:57 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi
>
> I am using spark streaming in Java. One of the problems I have is I need
> to save twitter status in JSON format as I receive them
>
> When I run the following code on my local machine. It work how ever all
> the output files are created in the current directory of the driver
> program. Clearly not a good cluster solution. Any idea how I can configure
> spark so that it will write the output to hdfs?
>
> JavaDStream tweets = TwitterFilterQueryUtils.createStream(
> ssc, twitterAuth);
>
> DStream dStream = tweets.dstream();
>
> String prefix = “MyPrefix";
>
> String suffix = "json";
>
> //  this works, when I test locally the files are created in the
> current directory  of driver program
>
> dStream.saveAsTextFiles(prefix, suffix);
>
>
> Is there something I need to set on my SparkConf object?
>
>
> Kind regards
>
>
> andy
>


RE: SparkR job with >200 tasks hangs when calling from web server

2015-11-01 Thread Sun, Rui
I guess that this is not related to SparkR, but something wrong in the Spark 
Core.

Could you try your application logic within spark-shell (you have to use Scala 
DataFrame API) instead of SparkR shell and to see if this issue still happens?

-Original Message-
From: rporcio [mailto:rpor...@gmail.com] 
Sent: Friday, October 30, 2015 11:09 PM
To: user@spark.apache.org
Subject: SparkR job with >200 tasks hangs when calling from web server

Hi,

I have a web server which can execute R codes using SparkR.
The R session is created with the Rscript init.R command where the /init.R/ 
file contains a sparkR initialization section:

/library(SparkR, lib.loc = paste("/opt/Spark/spark-1.5.1-bin-hadoop2.6",
"R", "lib", sep = "/"))
sc <<- sparkR.init(master = "local[4]", appName = "TestR", sparkHome = 
"/opt/Spark/spark-1.5.1-bin-hadoop2.6", sparkPackages =
"com.databricks:spark-csv_2.10:1.2.0")
sqlContext <<- sparkRSQL.init(sc)/

I have the below example R code that I want to execute (flights.csv comes from 
SparkR examples):

/df <- read.df(sqlContext, "/opt/Spark/flights.csv", source = 
"com.databricks.spark.csv", header="true") registerTempTable(df, "flights") 
depDF <- sql(sqlContext, "SELECT dep FROM flights") deps <- collect(depDF)/

If I run this code, it is successfully executed . When I check the Spark UI, I 
see that the belonging job has 2 tasks only.

But if I change the first row to
/df <- repartition(read.df(sqlContext, "/opt/Spark/flights.csv", source = 
"com.databricks.spark.csv", header="true"), 200)/ and execute the R code again, 
the belonging job has 202 tasks from which it sucessfully finishes some (like 
132/202) but then it hangs forever.

If I check the /stderr/ of the executor I can see that the executor can't 
communicate with the driver:

/15/10/30 15:34:24 WARN AkkaRpcEndpointRef: Error sending message [message = 
Heartbeat(0,[Lscala.Tuple2;@36834e15,BlockManagerId(0, 192.168.178.198, 7092))] 
in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [30 seconds]. 
This timeout is controlled by spark.rpc.askTimeout/

I tried to change memory (e.g. 4g to driver), akka and timeout settings but 
with no luck.

Executing the same code (with the repartition part) from R, it successfully 
finishes, so I assume the problem is related somehow to the webserver, but I 
can't figure it out.

I'm using Centos.

Can someone give me some advice what should I try?

Thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-job-with-200-tasks-hangs-when-calling-from-web-server-tp25237.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: issue with spark.driver.maxResultSize parameter in spark 1.3

2015-11-01 Thread shahid ashraf
Is your process getting killed...
if yes then try to see using dmesg.

On Mon, Nov 2, 2015 at 8:17 AM, karthik kadiyam <
karthik.kadiyam...@gmail.com> wrote:

> Did any one had issue setting spark.driver.maxResultSize value ?
>
> On Friday, October 30, 2015, karthik kadiyam 
> wrote:
>
>> Hi Shahid,
>>
>> I played around with spark driver memory too. In the conf file it was set
>> to " --driver-memory 20G " first. When i changed the spark driver
>> maxResultSize from default to 2g ,i changed the driver memory to 30G and
>> tired too. It gave we same error says "bigger than  (1024.0 MB) " .
>> spark.driver.maxResultSize
>> One other thing i observed is , in one of the tasks the data its trying
>> to process is more than 100 MB and that exceutor and task keeps losing
>> connection and doing retry. I tried increase the Tasks by repartition from
>> 120 to 240 to 480 also. Still i can see in one of my tasks it still is
>> trying to process more than 100 mb. Other task hardly process 1 mb to 10 mb
>> , some around 20 mbs, some have 0 mbs .
>>
>> Any idea how can i try to even the data distribution acrosss multiple
>> node.
>>
>> On Fri, Oct 30, 2015 at 12:09 AM, shahid ashraf 
>> wrote:
>>
>>> Hi
>>> I guess you need to increase spark driver memory as well. But that
>>> should be set in conf files
>>> Let me know if that resolves
>>> On Oct 30, 2015 7:33 AM, "karthik kadiyam" 
>>> wrote:
>>>
 Hi,

 In spark streaming job i had the following setting

 this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
 and i got the error in the job as below

 User class threw exception: Job aborted due to stage failure: Total
 size of serialized results of 120 tasks (1082.2 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)

 Basically i realized that as default value is 1 GB. I changed
 the configuration as below.

 this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);

 and when i ran the job it gave the error

 User class threw exception: Job aborted due to stage failure: Total
 size of serialized results of 120 tasks (1082.2 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)

 So, basically the change i made is not been considered in the job. so
 my question is

 - "spark.driver.maxResultSize", “2g” is this the right way to change
 or any other way to do it.
 - Is this a bug in spark 1.3 or something or any one had this issue
 before?


>>


-- 
with Regards
Shahid Ashraf


RE: How to set memory for SparkR with master="local[*]"

2015-11-01 Thread Sun, Rui
Hi, Matej,

For the convenience of SparkR users, when they start SparkR without using 
bin/sparkR, (for example, in RStudio), 
https://issues.apache.org/jira/browse/SPARK-11340 enables setting of 
“spark.driver.memory”, (also other similar options, like: 
spark.driver.extraClassPath, spark.driver.extraJavaOptions, 
spark.driver.extraLibraryPath) in the sparkEnvir parameter for sparkR.init() to 
take effect.

Would you like to give it a try? Note the change is on the master branch, you 
have to build Spark from source before using it.


From: Sun, Rui [mailto:rui@intel.com]
Sent: Monday, October 26, 2015 10:24 AM
To: Dirceu Semighini Filho
Cc: user
Subject: RE: How to set memory for SparkR with master="local[*]"

As documented in 
http://spark.apache.org/docs/latest/configuration.html#available-properties,
Note for “spark.driver.memory”:
Note: In client mode, this config must not be set through the SparkConf 
directly in your application, because the driver JVM has already started at 
that point. Instead, please set this through the --driver-memory command line 
option or in your default properties file.

If you are to start a SparkR shell using bin/sparkR, then you can use 
bin/sparkR –driver-memory. You have no chance to set the driver memory size 
after the R shell has been launched via bin/sparkR.

Buf if you are to start a SparkR shell manually without using bin/sparkR (for 
example, in Rstudio), you can:
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS" = "--conf spark.driver.memory=2g sparkr-shell")
sc <- sparkR.init()

From: Dirceu Semighini Filho [mailto:dirceu.semigh...@gmail.com]
Sent: Friday, October 23, 2015 7:53 PM
Cc: user
Subject: Re: How to set memory for SparkR with master="local[*]"

Hi Matej,
I'm also using this and I'm having the same behavior here, my driver has only 
530mb which is the default value.

Maybe this is a bug.

2015-10-23 9:43 GMT-02:00 Matej Holec 
>:
Hello!

How to adjust the memory settings properly for SparkR with master="local[*]"
in R?


*When running from  R -- SparkR doesn't accept memory settings :(*

I use the following commands:

R>  library(SparkR)
R>  sc <- sparkR.init(master = "local[*]", sparkEnvir =
list(spark.driver.memory = "5g"))

Despite the variable spark.driver.memory is correctly set (checked in
http://node:4040/environment/), the driver has only the default amount of
memory allocated (Storage Memory 530.3 MB).

*But when running from  spark-1.5.1-bin-hadoop2.6/bin/sparkR -- OK*

The following command:

]$ spark-1.5.1-bin-hadoop2.6/bin/sparkR --driver-memory 5g

creates SparkR session with properly adjustest driver memory (Storage Memory
2.6 GB).


Any suggestion?

Thanks
Matej



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-memory-for-SparkR-with-master-local-tp25178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



Re: Spark Streaming: how to StreamingContext.queueStream

2015-11-01 Thread Akhil Das
You can do something like this:


val rddQueue = scala.collection.mutable.Queue(rdd1,rdd2,rdd3)

val qDstream = ssc.queueStream(rddQueue)




Thanks
Best Regards

On Sat, Oct 24, 2015 at 4:43 AM, Anfernee Xu  wrote:

> Hi,
>
> Here's my situation, I have some kind of offline dataset, but I want to
> form a virtual data stream feeding to Spark Streaming, my code looks like
> this
>
>
>// sort offline data by time
>  1)  JavaRDD sortedByTime = offlineDataRDD.sortBy( );
>
>// compute a list of JavaRDD,  each element JavaRDD is hosting the data
> in the same time
>// bucket.
>   2) List virtualStreamRdd = ?
>
> Queue queue = Queues.newLinkedBlockingQueue();
> queue.addAll(virtualStreamRdd);
>
> /*
>  * Create DStream from the queue
>  */
>
> 3) final JavaDStream rowDStream =
> streamingContext.queueStream(queue);
>
>
> Currently I'm stucking in 2), any suggestion is appreciated.
>
> Thanks
>
> --
> --Anfernee
>


Spark Streaming data checkpoint performance

2015-11-01 Thread Thúy Hằng Lê
Hi Spark guru

I am evaluating Spark Streaming,

In my application I need to maintain cumulative statistics (e.g the total
running word count), so I need to call the updateStateByKey function on
very micro-batch.

After setting those things, I got following behaviors:
* The Processing Time is very high every 10 seconds - usually 5x
higher (which I guess it's data checking point job)
* The Processing Time becomes higher and higher over time, after 10
minutes it's much higher than the batch interval and lead to huge
Scheduling Delay and a lots Active Batches in queue.

My questions is:

 * Is this expected behavior? Is there any way to improve the
performance of data checking point?
 * How data checking point in Spark Streaming works? Does it need
to load all previous checking point data in order to build new one?

My job is very simple:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));
JavaPairInputDStream messages =
KafkaUtils.createDirectStream(...);

JavaPairDStream stats = messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS) .updateStateByKey(RUNNING_STATS);

stats.print()


Re: Caching causes later actions to get stuck

2015-11-01 Thread Sampo Niskanen
Hi,

Any ideas what's going wrong or how to fix it?  Do I have to downgrade to
0.9.x to be able to use Spark?


Best regards,

*Sampo Niskanen*

*Lead developer / Wellmo*
sampo.niska...@wellmo.com
+358 40 820 5291


On Fri, Oct 30, 2015 at 4:57 PM, Sampo Niskanen 
wrote:

> Hi,
>
> I'm facing a problem where Spark is able to perform an action on a cached
> RDD correctly the first time it is run, but running it immediately
> afterwards (or an action depending on that RDD) causes it to get stuck.
>
> I'm using a MongoDB connector for fetching all documents from a collection
> to an RDD and caching that (though according to the error message it
> doesn't fully fit).  The first action on it always succeeds, but latter
> actions fail.  I just upgraded from Spark 0.9.x to 1.5.1, and didn't have
> that problem with the older version.
>
>
> The output I get:
>
>
> scala> analyticsRDD.cache
> res10: analyticsRDD.type = MapPartitionsRDD[84] at map at Mongo.scala:69
>
> scala> analyticsRDD.count
> [Stage 2:=> (472 + 8)
> / 524]15/10/30 14:20:00 WARN MemoryStore: Not enough space to cache
> rdd_84_469 in memory! (computed 13.0 MB so far)
> 15/10/30 14:20:00 WARN MemoryStore: Not enough space to cache rdd_84_470
> in memory! (computed 12.1 MB so far)
> 15/10/30 14:20:00 WARN MemoryStore: Not enough space to cache rdd_84_476
> in memory! (computed 5.6 MB so far)
> ...
> 15/10/30 14:20:06 WARN MemoryStore: Not enough space to cache rdd_84_522
> in memory! (computed 5.3 MB so far)
> [Stage 2:==>(522 + 2)
> / 524]15/10/30 14:20:06 WARN MemoryStore: Not enough space to cache
> rdd_84_521 in memory! (computed 13.9 MB so far)
> res11: Long = 7754957
>
>
> scala> analyticsRDD.count
> [Stage 3:=> (474 + 0)
> / 524]
>
>
> *** Restart Spark ***
>
> scala> analyticsRDD.count
> res10: Long = 7755043
>
>
> scala> analyticsRDD.count
> res11: Long = 7755050
>
>
>
> The cached RDD always gets stuck at the same point.  I tried enabling full
> debug logging, but couldn't make out anything useful.
>
>
> I'm also facing another issue with loading a lot of data from MongoDB,
> which might be related, but the error is different:
> https://groups.google.com/forum/#!topic/mongodb-user/Knj406szd74
>
>
> Any ideas?
>
>
> *Sampo Niskanen*
>
> *Lead developer / Wellmo*
> sampo.niska...@wellmo.com
> +358 40 820 5291
>
>


Error : - No filesystem for scheme: spark

2015-11-01 Thread Balachandar R.A.
Can someone tell me at what point this error could come?

In one of my use cases, I am trying to use hadoop custom input format. Here
is my code.

val hConf: Configuration = sc.hadoopConfiguration
hConf.set("fs.hdfs.impl",
classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
hConf.set("fs.file.impl",
classOf[org.apache.hadoop.fs.LocalFileSystem].getName)var job = new
Job(hConf)
FileInputFormat.setInputPaths(job,new
Path("hdfs:///user/bala/MyBinaryFile"));


var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat],
classOf[IntWritable],
classOf[BytesWritable],
job.getConfiguration()
)

val count = hRDD.mapPartitionsWithInputSplit{ (split, iter) =>
myfuncPart(split, iter)}

The moment I invoke mapPartitionsWithInputSplit() method, I get the
below error in my spark-submit launch


15/10/30 11:11:39 WARN scheduler.TaskSetManager: Lost task 0.0 in
stage 0.0 (TID 0, 40.221.94.235): java.io.IOException: No FileSystem
for scheme: spark
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

Any help here to move towards fixing this will be of great help



Thanks

Bala


Re: Running 2 spark application in parallel

2015-11-01 Thread Akhil Das
Have a look at the dynamic resource allocation listed here
https://spark.apache.org/docs/latest/job-scheduling.html

Thanks
Best Regards

On Thu, Oct 22, 2015 at 11:50 PM, Suman Somasundar <
suman.somasun...@oracle.com> wrote:

> Hi all,
>
>
>
> Is there a way to run 2 spark applications in parallel under Yarn in the
> same cluster?
>
>
>
> Currently, if I submit 2 applications, one of them waits till the other
> one is completed.
>
>
>
> I want both of them to start and run at the same time.
>
>
>
> Thanks,
> Suman.
>