Spark SQL: what does an exclamation mark mean in the plan?

2015-10-19 Thread Xiao Li
Hi, all,

After turning on the trace, I saw a strange exclamation mark in
the intermediate plans. This happened in catalyst analyzer.

Join Inner, Some((col1#0 = col1#6))
 Project [col1#0,col2#1,col3#2,col2_alias#24,col3#2 AS col3_alias#13]
  Project [col1#0,col2#1,col3#2,col2#1 AS col2_alias#24]
   LogicalRDD [col1#0,col2#1,col3#2], MapPartitionsRDD[1] at
createDataFrame at SimpleApp.scala:32
 Aggregate [col1#6], [col1#6,count(col1#6) AS count(col1)#5L]
  *!Project [col1#6,col2#7,col3#8,col2_alias#24,col3#8 AS col3_alias#4]*
   Project [col1#6,col2#7,col3#8,col2#7 AS col2_alias#3]
LogicalRDD [col1#6,col2#7,col3#8], MapPartitionsRDD[1] at
createDataFrame at SimpleApp.scala:32

Could anybody give me a hint why there exists a !(exclamation mark) before
the node name (Project)? This ! mark does not disappear in the subsequent
query plan.

Thank you!

Xiao Li


Guaranteed processing orders of each batch in Spark Streaming

2015-10-19 Thread Renjie Liu
Hi, all:
I've read source code and it seems that there is no guarantee that the
order of processing of each RDD is guaranteed since jobs are just submitted
to a thread pool. I  believe that this is quite important in streaming
since updates should be ordered.


Re: Unable to run applications on spark in standalone cluster mode

2015-10-19 Thread Jean-Baptiste Onofré

Hi Rohith,

Do you have multiple interfaces on the machine hosting the master ?

If so, can you try to force to the public interface using:

sbin/start-master.sh --ip xxx.xxx.xxx.xxx

Regards
JB

On 10/19/2015 02:05 PM, Rohith Parameshwara wrote:

Hi all,

 I am doing some experiments on spark standalone cluster
setup and I am facing the following issue:

I have a 4 node cluster setup. As per
http://spark.apache.org/docs/latest/spark-standalone.html#starting-a-cluster-manually
I tried to start the cluster with the scripts but, the slaves did not
start and gave permission denied error….My conf/slaves had a list of IP
addresses of the slaves… But Then I was able to start the worker nodes
by going to the respective slave machine and running the start-slave.sh
script with the master IP as parameter…. The webui showed the all the 3
worker nodes running as :

The application when submitted to the cluster will show in all the work
nodes in the webui…. But in the command line where is spark-submit
command was run, it gives this error periodically and continuously….

ERROR NettyTransport: failed to bind to /172.28.161.138:7077, shutting
down Netty transport

15/10/19 11:59:38 WARN Utils: Service 'sparkDriver' could not bind on
port 7077. Attempting port 7078.

15/10/19 11:59:38 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@localhost:7077: akka.remote.InvalidAssociation:
Invalid address: akka.tcp://sparkMaster@localhost:7077….

My conf/spark-env.sh has:

SPARK_MASTER_IP=172.28.161.138

SPARK_MASTER_PORT=7077

SPARK_MASTER_WEBUI_PORT=8080

SPARK_WORKER_WEBUI_PORT=8081

SPARK_WORKER_INSTANCES=1

And I have also put this in all the slave nodes too….

The applications are running fine in –master local mode but in –master
spark://masterip:7077, it is not working….

Any type of help would be appreciated….. Thanks in advance



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



failed mesos task loses executor

2015-10-19 Thread Adrian Bridgett
Just testing spark v1.5.0 (on mesos v0.23) and we saw something 
unexpected (according to the event timeline) - when a spark task failed 
(intermittent S3 connection failure), the whole executor was removed and 
was never recovered so the job proceeded slower than normal.


Looking at the code I saw something that seemed a little odd in 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala:


  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
...
if (TaskState.isFailed(TaskState.fromMesos(status.getState))
  && taskIdToSlaveId.contains(tid)) {
  // We lost the executor on this slave, so remember that it's gone
  removeExecutor(taskIdToSlaveId(tid), "Lost executor")
}
if (TaskState.isFinished(state)) {
  taskIdToSlaveId.remove(tid)
}
  }

I don't know either codebase at all, however it seems odd to kill the 
executor for a failed task rather than just a lost task.  I did a quick 
test (with v1.5.1) where I replaced this line with:


   if ((TaskState.fromMesos(status.getState) == TaskState.LOST)

and all seemed well - I faked the problem (using iptables to briefly 
block access to the S3 endpoint), the task failed but was retried (on 
the same executor), succeeded and continued on its merry way.


Adrian
--
*Adrian Bridgett* |  Sysadmin Engineer, OpenSignal 


_
Office: First Floor, Scriptor Court, 155-157 Farringdon Road, 
Clerkenwell, London, EC1R 3AD

Phone #: +44 777-377-8251
Skype: abridgett  |@adrianbridgett | 
LinkedIn link 

_


Building Spark w/ 1.8 and binary incompatibilities

2015-10-19 Thread Iulian Dragoș
Hey all,

tl;dr; I built Spark with Java 1.8 even though my JAVA_HOME pointed to 1.7.
Then it failed with binary incompatibilities.

I couldn’t find any mention of this in the docs, so It might be a known
thing, but it’s definitely too easy to do the wrong thing.

The problem is that Maven is using the Zinc incremental compiler, which is
a long-running server. If the first build (that spawns the zinc server) is
started with Java 8 on the path, Maven will compile against Java 8 even
after changing JAVA_HOME and rebuilding.

I filed scala-maven-plugin#173
 but so far no
comment.

Steps to reproduce:

   - make sure zinc is not running yet
   - build with JAVA_HOME pointing to 1.8
   - point JAVA_HOME to 1.7
   - clean build
   - run Spark, watch it fail with NoSuchMethodError in ConcurrentHashMap.
   More details here
   

Workaround:

   - build/zinc/bin/zinc -shutdown
   - rebuild

iulian
​
-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: BUILD SYSTEM: amp-jenkins-worker-05 offline

2015-10-19 Thread Patrick Wendell
This is what I'm looking at:

https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/



On Mon, Oct 19, 2015 at 12:58 PM, shane knapp  wrote:

> all we did was reboot -05 and -03...  i'm seeing a bunch of green
> builds.  could you provide me w/some specific failures so i can look
> in to them more closely?
>
> On Mon, Oct 19, 2015 at 12:27 PM, Patrick Wendell 
> wrote:
> > Hey Shane,
> >
> > It also appears that every Spark build is failing right now. Could it be
> > related to your changes?
> >
> > - Patrick
> >
> > On Mon, Oct 19, 2015 at 11:13 AM, shane knapp 
> wrote:
> >>
> >> worker 05 is back up now...  looks like the machine OOMed and needed
> >> to be kicked.
> >>
> >> On Mon, Oct 19, 2015 at 9:39 AM, shane knapp 
> wrote:
> >> > i'll have to head down to the colo and see what's up with it...  it
> >> > seems to be wedged (pings ok, can't ssh in) and i'll update the list
> >> > when i figure out what's wrong.
> >> >
> >> > i don't think it caught fire (#toosoon?), because everything else is
> >> > up and running.  :)
> >> >
> >> > shane
> >>
> >> --
> >> You received this message because you are subscribed to the Google
> Groups
> >> "amp-infra" group.
> >> To unsubscribe from this group and stop receiving emails from it, send
> an
> >> email to amp-infra+unsubscr...@googlegroups.com.
> >> For more options, visit https://groups.google.com/d/optout.
> >
> >
> > --
> > You received this message because you are subscribed to the Google Groups
> > "amp-infra" group.
> > To unsubscribe from this group and stop receiving emails from it, send an
> > email to amp-infra+unsubscr...@googlegroups.com.
> > For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the Google Groups
> "amp-infra" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to amp-infra+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>


Re: BUILD SYSTEM: amp-jenkins-worker-05 offline

2015-10-19 Thread Patrick Wendell
I think many of them are coming form the Spark 1.4 builds:

https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/Spark-1.4-Maven-pre-YARN/3900/console

On Mon, Oct 19, 2015 at 1:44 PM, Patrick Wendell  wrote:

> This is what I'm looking at:
>
>
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/
>
>
>
> On Mon, Oct 19, 2015 at 12:58 PM, shane knapp  wrote:
>
>> all we did was reboot -05 and -03...  i'm seeing a bunch of green
>> builds.  could you provide me w/some specific failures so i can look
>> in to them more closely?
>>
>> On Mon, Oct 19, 2015 at 12:27 PM, Patrick Wendell 
>> wrote:
>> > Hey Shane,
>> >
>> > It also appears that every Spark build is failing right now. Could it be
>> > related to your changes?
>> >
>> > - Patrick
>> >
>> > On Mon, Oct 19, 2015 at 11:13 AM, shane knapp 
>> wrote:
>> >>
>> >> worker 05 is back up now...  looks like the machine OOMed and needed
>> >> to be kicked.
>> >>
>> >> On Mon, Oct 19, 2015 at 9:39 AM, shane knapp 
>> wrote:
>> >> > i'll have to head down to the colo and see what's up with it...  it
>> >> > seems to be wedged (pings ok, can't ssh in) and i'll update the list
>> >> > when i figure out what's wrong.
>> >> >
>> >> > i don't think it caught fire (#toosoon?), because everything else is
>> >> > up and running.  :)
>> >> >
>> >> > shane
>> >>
>> >> --
>> >> You received this message because you are subscribed to the Google
>> Groups
>> >> "amp-infra" group.
>> >> To unsubscribe from this group and stop receiving emails from it, send
>> an
>> >> email to amp-infra+unsubscr...@googlegroups.com.
>> >> For more options, visit https://groups.google.com/d/optout.
>> >
>> >
>> > --
>> > You received this message because you are subscribed to the Google
>> Groups
>> > "amp-infra" group.
>> > To unsubscribe from this group and stop receiving emails from it, send
>> an
>> > email to amp-infra+unsubscr...@googlegroups.com.
>> > For more options, visit https://groups.google.com/d/optout.
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "amp-infra" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to amp-infra+unsubscr...@googlegroups.com.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>


Re: BUILD SYSTEM: amp-jenkins-worker-05 offline

2015-10-19 Thread shane knapp
++joshrosen

some of those 1.4 builds were incorrectly configured and launching on
a reserved executor...  josh fixed them and we're looking a lot better
(meaning that we're building and not failing at launch).

shane

On Mon, Oct 19, 2015 at 1:49 PM, Patrick Wendell  wrote:
> I think many of them are coming form the Spark 1.4 builds:
>
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/Spark-1.4-Maven-pre-YARN/3900/console
>
> On Mon, Oct 19, 2015 at 1:44 PM, Patrick Wendell  wrote:
>>
>> This is what I'm looking at:
>>
>>
>> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/
>>
>>
>>
>> On Mon, Oct 19, 2015 at 12:58 PM, shane knapp  wrote:
>>>
>>> all we did was reboot -05 and -03...  i'm seeing a bunch of green
>>> builds.  could you provide me w/some specific failures so i can look
>>> in to them more closely?
>>>
>>> On Mon, Oct 19, 2015 at 12:27 PM, Patrick Wendell 
>>> wrote:
>>> > Hey Shane,
>>> >
>>> > It also appears that every Spark build is failing right now. Could it
>>> > be
>>> > related to your changes?
>>> >
>>> > - Patrick
>>> >
>>> > On Mon, Oct 19, 2015 at 11:13 AM, shane knapp 
>>> > wrote:
>>> >>
>>> >> worker 05 is back up now...  looks like the machine OOMed and needed
>>> >> to be kicked.
>>> >>
>>> >> On Mon, Oct 19, 2015 at 9:39 AM, shane knapp 
>>> >> wrote:
>>> >> > i'll have to head down to the colo and see what's up with it...  it
>>> >> > seems to be wedged (pings ok, can't ssh in) and i'll update the list
>>> >> > when i figure out what's wrong.
>>> >> >
>>> >> > i don't think it caught fire (#toosoon?), because everything else is
>>> >> > up and running.  :)
>>> >> >
>>> >> > shane
>>> >>
>>> >> --
>>> >> You received this message because you are subscribed to the Google
>>> >> Groups
>>> >> "amp-infra" group.
>>> >> To unsubscribe from this group and stop receiving emails from it, send
>>> >> an
>>> >> email to amp-infra+unsubscr...@googlegroups.com.
>>> >> For more options, visit https://groups.google.com/d/optout.
>>> >
>>> >
>>> > --
>>> > You received this message because you are subscribed to the Google
>>> > Groups
>>> > "amp-infra" group.
>>> > To unsubscribe from this group and stop receiving emails from it, send
>>> > an
>>> > email to amp-infra+unsubscr...@googlegroups.com.
>>> > For more options, visit https://groups.google.com/d/optout.
>>>
>>> --
>>> You received this message because you are subscribed to the Google Groups
>>> "amp-infra" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an
>>> email to amp-infra+unsubscr...@googlegroups.com.
>>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>
> --
> You received this message because you are subscribed to the Google Groups
> "amp-infra" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to amp-infra+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: BUILD SYSTEM: amp-jenkins-worker-05 offline

2015-10-19 Thread shane knapp
all we did was reboot -05 and -03...  i'm seeing a bunch of green
builds.  could you provide me w/some specific failures so i can look
in to them more closely?

On Mon, Oct 19, 2015 at 12:27 PM, Patrick Wendell  wrote:
> Hey Shane,
>
> It also appears that every Spark build is failing right now. Could it be
> related to your changes?
>
> - Patrick
>
> On Mon, Oct 19, 2015 at 11:13 AM, shane knapp  wrote:
>>
>> worker 05 is back up now...  looks like the machine OOMed and needed
>> to be kicked.
>>
>> On Mon, Oct 19, 2015 at 9:39 AM, shane knapp  wrote:
>> > i'll have to head down to the colo and see what's up with it...  it
>> > seems to be wedged (pings ok, can't ssh in) and i'll update the list
>> > when i figure out what's wrong.
>> >
>> > i don't think it caught fire (#toosoon?), because everything else is
>> > up and running.  :)
>> >
>> > shane
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "amp-infra" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to amp-infra+unsubscr...@googlegroups.com.
>> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "amp-infra" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to amp-infra+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: ShuffledHashJoin Possible Issue

2015-10-19 Thread Davies Liu
Can you reproduce it on master?

I can't reproduce it with the following code:

>>> t2 = sqlContext.range(50).selectExpr("concat('A', id) as id")
>>> t1 = sqlContext.range(10).selectExpr("concat('A', id) as id")
>>> t1.join(t2).where(t1.id == t2.id).explain()
ShuffledHashJoin [id#21], [id#19], BuildRight
 TungstenExchange hashpartitioning(id#21,200)
  TungstenProject [concat(A,cast(id#20L as string)) AS id#21]
   Scan PhysicalRDD[id#20L]
 TungstenExchange hashpartitioning(id#19,200)
  TungstenProject [concat(A,cast(id#18L as string)) AS id#19]
   Scan PhysicalRDD[id#18L]

>>> t1.join(t2).where(t1.id == t2.id).count()
10


On Mon, Oct 19, 2015 at 2:59 AM, gsvic  wrote:
> Hi Hao,
>
> Each table is created with the following python code snippet:
>
> data = [{'id': 'A%d'%i, 'value':ceil(random()*10)} for i in range(0,50)]
> with open('A.json', 'w+') as output:
> json.dump(data, output)
>
> The tables A and B containing 10 and 50 tuples respectively.
>
> In spark shell I type
>
> sqlContext.setConf("spark.sql.planner.sortMergeJoin", "false") to disable
> sortMergeJoin and
> sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0") to disable
> BroadcastHashJoin, cause the tables are too small and this join will be
> selected.
>
> Finally I run the following query:
> t1.join(t2).where(t1("id").equalTo(t2("id"))).count
>
> and the result I get equals to zero, while ShuffledHashJoin and
> SortMergeJoin returns the right result (10).
>
>
>
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/ShuffledHashJoin-Possible-Issue-tp14672p14682.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: BUILD SYSTEM: amp-jenkins-worker-05 offline

2015-10-19 Thread shane knapp
worker 05 is back up now...  looks like the machine OOMed and needed
to be kicked.

On Mon, Oct 19, 2015 at 9:39 AM, shane knapp  wrote:
> i'll have to head down to the colo and see what's up with it...  it
> seems to be wedged (pings ok, can't ssh in) and i'll update the list
> when i figure out what's wrong.
>
> i don't think it caught fire (#toosoon?), because everything else is
> up and running.  :)
>
> shane

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



RE: Gradient Descent with large model size

2015-10-19 Thread Ulanov, Alexander
Evan, Joseph

Thank you for valuable suggestions. It would be great to improve TreeAggregate 
(if possible).

Making less updates would certainly make sense, though that will mean using 
batch gradient such as LBFGS. It seems as today it is the only viable option in 
Spark.

I will also take a look into how to zip the data sent as update. Do you know 
any options except going from double to single precision (or less) ?

Best regards, Alexander

From: Evan Sparks [mailto:evan.spa...@gmail.com]
Sent: Saturday, October 17, 2015 2:24 PM
To: Joseph Bradley
Cc: Ulanov, Alexander; dev@spark.apache.org
Subject: Re: Gradient Descent with large model size

Yes, remember that your bandwidth is the maximum number of bytes per second 
that can be shipped to the driver. So if you've got 5 blocks that size, then it 
looks like you're basically saturating the network.

Aggregation trees help for many partitions/nodes and butterfly mixing can help 
use all of the network resources. I have seen implementations of butterfly 
mixing in spark but don't know if we've got one in mainline. Zhao and Canny's 
work [1] details this approach in the context of model fitting.

At any rate, for this type of ANN work with huge models in *any* distributed 
setting, you're going to need to get faster networking (most production 
deployments I know of either have 10 gigabit Ethernet or 40 gigabit infiniband 
links), or figure out a way to decrease frequency or density of updates. Both 
would be even better.

[1] http://www.cs.berkeley.edu/~jfc/papers/13/butterflymixing.pdf

On Oct 17, 2015, at 12:47 PM, Joseph Bradley 
> wrote:
The decrease in running time from N=6 to N=7 makes some sense to me; that 
should be when tree aggregation kicks in.  I'd call it an improvement to run in 
the same ~13sec increasing from N=6 to N=9.

"Does this mean that for 5 nodes with treeaggreate of depth 1 it will take 
5*3.1~15.5 seconds?"
--> I would guess so since all of that will be aggregated on the driver, but I 
don't know enough about Spark's shuffling/networking to say for sure.  Others 
may be able to help more.

Your numbers do make me wonder if we should examine the structure of the tree 
aggregation more carefully and see if we can improve it.  
https://issues.apache.org/jira/browse/SPARK-11168

Joseph

On Thu, Oct 15, 2015 at 7:01 PM, Ulanov, Alexander 
> wrote:
Hi Joseph,

There seems to be no improvement if I run it with more partitions or bigger 
depth:
N = 6 Avg time: 13.49157910868
N = 7 Avg time: 8.929480508
N = 8 Avg time: 14.50712347198
N= 9 Avg time: 13.85487164533

Depth = 3
N=2 Avg time: 8.85389534633
N=5 Avg time: 15.99157492467

I also measured the bandwidth of my network with iperf. It shows 247Mbit/s. So 
the transfer of 12M array of double message should take 64 * 12M/247M~3.1s. 
Does this mean that for 5 nodes with treeaggreate of depth 1 it will take 
5*3.1~15.5 seconds?

Best regards, Alexander
From: Joseph Bradley 
[mailto:jos...@databricks.com]
Sent: Wednesday, October 14, 2015 11:35 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Gradient Descent with large model size

For those numbers of partitions, I don't think you'll actually use tree 
aggregation.  The number of partitions needs to be over a certain threshold (>= 
7) before treeAggregate really operates on a tree structure:
https://github.com/apache/spark/blob/9808052b5adfed7dafd6c1b3971b998e45b2799a/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1100

Do you see a slower increase in running time with more partitions?  For 5 
partitions, do you find things improve if you tell treeAggregate to use depth > 
2?

Joseph

On Wed, Oct 14, 2015 at 1:18 PM, Ulanov, Alexander 
> wrote:
Dear Spark developers,

I have noticed that Gradient Descent is Spark MLlib takes long time if the 
model is large. It is implemented with TreeAggregate. I’ve extracted the code 
from GradientDescent.scala to perform the benchmark. It allocates the Array of 
a given size and the aggregates it:

val dataSize = 1200
val n = 5
val maxIterations = 3
val rdd = sc.parallelize(0 until n, n).cache()
rdd.count()
var avgTime = 0.0
for (i <- 1 to maxIterations) {
  val start = System.nanoTime()
  val result = rdd.treeAggregate((new Array[Double](dataSize), 0.0, 0L))(
seqOp = (c, v) => {
  // c: (grad, loss, count)
  val l = 0.0
  (c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
  // c: (grad, loss, count)
  (c1._1, c1._2 + c2._2, c1._3 + c2._3)
})
  avgTime += (System.nanoTime() - start) / 1e9
  assert(result._1.length == dataSize)
}
println("Avg time: " + avgTime / maxIterations)

If I run on my cluster of 1 master and 5 workers, I get the following results 
(given 

Re: Spark SQL: what does an exclamation mark mean in the plan?

2015-10-19 Thread Michael Armbrust
It means that there is an invalid attribute reference (i.e. a #n where the
attribute is missing from the child operator).

On Sun, Oct 18, 2015 at 11:38 PM, Xiao Li  wrote:

> Hi, all,
>
> After turning on the trace, I saw a strange exclamation mark in
> the intermediate plans. This happened in catalyst analyzer.
>
> Join Inner, Some((col1#0 = col1#6))
>  Project [col1#0,col2#1,col3#2,col2_alias#24,col3#2 AS col3_alias#13]
>   Project [col1#0,col2#1,col3#2,col2#1 AS col2_alias#24]
>LogicalRDD [col1#0,col2#1,col3#2], MapPartitionsRDD[1] at
> createDataFrame at SimpleApp.scala:32
>  Aggregate [col1#6], [col1#6,count(col1#6) AS count(col1)#5L]
>   *!Project [col1#6,col2#7,col3#8,col2_alias#24,col3#8 AS col3_alias#4]*
>Project [col1#6,col2#7,col3#8,col2#7 AS col2_alias#3]
> LogicalRDD [col1#6,col2#7,col3#8], MapPartitionsRDD[1] at
> createDataFrame at SimpleApp.scala:32
>
> Could anybody give me a hint why there exists a !(exclamation mark) before
> the node name (Project)? This ! mark does not disappear in the subsequent
> query plan.
>
> Thank you!
>
> Xiao Li
>


Re: BUILD SYSTEM: amp-jenkins-worker-05 offline

2015-10-19 Thread Patrick Wendell
Hey Shane,

It also appears that every Spark build is failing right now. Could it be
related to your changes?

- Patrick

On Mon, Oct 19, 2015 at 11:13 AM, shane knapp  wrote:

> worker 05 is back up now...  looks like the machine OOMed and needed
> to be kicked.
>
> On Mon, Oct 19, 2015 at 9:39 AM, shane knapp  wrote:
> > i'll have to head down to the colo and see what's up with it...  it
> > seems to be wedged (pings ok, can't ssh in) and i'll update the list
> > when i figure out what's wrong.
> >
> > i don't think it caught fire (#toosoon?), because everything else is
> > up and running.  :)
> >
> > shane
>
> --
> You received this message because you are subscribed to the Google Groups
> "amp-infra" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to amp-infra+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>


Re: Spark SQL: what does an exclamation mark mean in the plan?

2015-10-19 Thread Xiao Li
Hi, Michael,

Thank you again! Just found the functions that generate the ! mark

  /**
   * A prefix string used when printing the plan.
   *
   * We use "!" to indicate an invalid plan, and "'" to indicate an
unresolved plan.
   */
  protected def statePrefix = if (missingInput.nonEmpty &&
children.nonEmpty) "!" else ""

  override def simpleString: String = statePrefix + super.simpleString


Xiao Li

2015-10-19 11:16 GMT-07:00 Michael Armbrust :

> It means that there is an invalid attribute reference (i.e. a #n where the
> attribute is missing from the child operator).
>
> On Sun, Oct 18, 2015 at 11:38 PM, Xiao Li  wrote:
>
>> Hi, all,
>>
>> After turning on the trace, I saw a strange exclamation mark in
>> the intermediate plans. This happened in catalyst analyzer.
>>
>> Join Inner, Some((col1#0 = col1#6))
>>  Project [col1#0,col2#1,col3#2,col2_alias#24,col3#2 AS col3_alias#13]
>>   Project [col1#0,col2#1,col3#2,col2#1 AS col2_alias#24]
>>LogicalRDD [col1#0,col2#1,col3#2], MapPartitionsRDD[1] at
>> createDataFrame at SimpleApp.scala:32
>>  Aggregate [col1#6], [col1#6,count(col1#6) AS count(col1)#5L]
>>   *!Project [col1#6,col2#7,col3#8,col2_alias#24,col3#8 AS col3_alias#4]*
>>Project [col1#6,col2#7,col3#8,col2#7 AS col2_alias#3]
>> LogicalRDD [col1#6,col2#7,col3#8], MapPartitionsRDD[1] at
>> createDataFrame at SimpleApp.scala:32
>>
>> Could anybody give me a hint why there exists a !(exclamation mark)
>> before the node name (Project)? This ! mark does not disappear in the
>> subsequent query plan.
>>
>> Thank you!
>>
>> Xiao Li
>>
>
>


Re: BUILD SYSTEM: amp-jenkins-worker-05 offline

2015-10-19 Thread shane knapp
things are green, nice catch on the job config, josh.

On Mon, Oct 19, 2015 at 1:57 PM, shane knapp  wrote:
> ++joshrosen
>
> some of those 1.4 builds were incorrectly configured and launching on
> a reserved executor...  josh fixed them and we're looking a lot better
> (meaning that we're building and not failing at launch).
>
> shane
>
> On Mon, Oct 19, 2015 at 1:49 PM, Patrick Wendell  wrote:
>> I think many of them are coming form the Spark 1.4 builds:
>>
>> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/Spark-1.4-Maven-pre-YARN/3900/console
>>
>> On Mon, Oct 19, 2015 at 1:44 PM, Patrick Wendell  wrote:
>>>
>>> This is what I'm looking at:
>>>
>>>
>>> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/
>>>
>>>
>>>
>>> On Mon, Oct 19, 2015 at 12:58 PM, shane knapp  wrote:

 all we did was reboot -05 and -03...  i'm seeing a bunch of green
 builds.  could you provide me w/some specific failures so i can look
 in to them more closely?

 On Mon, Oct 19, 2015 at 12:27 PM, Patrick Wendell 
 wrote:
 > Hey Shane,
 >
 > It also appears that every Spark build is failing right now. Could it
 > be
 > related to your changes?
 >
 > - Patrick
 >
 > On Mon, Oct 19, 2015 at 11:13 AM, shane knapp 
 > wrote:
 >>
 >> worker 05 is back up now...  looks like the machine OOMed and needed
 >> to be kicked.
 >>
 >> On Mon, Oct 19, 2015 at 9:39 AM, shane knapp 
 >> wrote:
 >> > i'll have to head down to the colo and see what's up with it...  it
 >> > seems to be wedged (pings ok, can't ssh in) and i'll update the list
 >> > when i figure out what's wrong.
 >> >
 >> > i don't think it caught fire (#toosoon?), because everything else is
 >> > up and running.  :)
 >> >
 >> > shane
 >>
 >> --
 >> You received this message because you are subscribed to the Google
 >> Groups
 >> "amp-infra" group.
 >> To unsubscribe from this group and stop receiving emails from it, send
 >> an
 >> email to amp-infra+unsubscr...@googlegroups.com.
 >> For more options, visit https://groups.google.com/d/optout.
 >
 >
 > --
 > You received this message because you are subscribed to the Google
 > Groups
 > "amp-infra" group.
 > To unsubscribe from this group and stop receiving emails from it, send
 > an
 > email to amp-infra+unsubscr...@googlegroups.com.
 > For more options, visit https://groups.google.com/d/optout.

 --
 You received this message because you are subscribed to the Google Groups
 "amp-infra" group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to amp-infra+unsubscr...@googlegroups.com.
 For more options, visit https://groups.google.com/d/optout.
>>>
>>>
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "amp-infra" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to amp-infra+unsubscr...@googlegroups.com.
>> For more options, visit https://groups.google.com/d/optout.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Problem using User Defined Predicate pushdown with core RDD and parquet - UDP class not found

2015-10-19 Thread Vladimir Vladimirov
Hi all

I feel like this questions is more Spark dev related that Spark user
related. Please correct me if I'm wrong.

My project's data flow involves sampling records from the data stored as
Parquet dataset.
I've checked DataFrames API and it doesn't support user defined predicates
projection pushdown - only simple filter expressions.
I want to use custom filter function predicate pushdown feature of parquet
while loading data with newAPIHadoopFile.
Simple filters constructed with org.apache.parquet.filter2 API works fine.
But User Defined Predicate works only with `--master local` mode.

When I try to run in yarn-client mode my test program that uses UDP class
to be used by parquet-mr I'm getting class not found exception.

I suspect that the issue could be related to the way how class loader works
from parquet or maybe it could be related to the fact that Spark executor
processes has my jar loaded from HTTP server and there is some security
policies (classpath shows that the jar URI is actually HTTP URL and not
local file).

I've tried to create uber jar with all dependencies and shipt it with the
spark app - no success.

PS I'm using spark 1.5.1.

Here is my command line I'm using to submit the application:

SPARK_CLASSPATH=./lib/my-jar-with-dependencies.jar spark-submit \
--master yarn-client
--num-executors 3 --driver-memory 3G --executor-memory 2G \
--executor-cores 1 \
--jars
./lib/my-jar-with-dependencies.jar,./lib/snappy-java-1.1.2.jar,./lib/parquet-hadoop-1.7.0.jar,./lib/parquet-avro-1.7.0.jar,./lib/parquet-column-1.7.0.jar,/opt/cloudera/parcels/CDH/jars/avro-1.7.6-cdh5.4.0.jar,/opt/cloudera/parcels/CDH/jars/avro-mapred-1.7.6-cdh5.4.0-hadoop2.jar,
\
--class my.app.parquet.filters.tools.TestSparkApp \
./lib/my-jar-with-dependencies.jar \
yarn-client \
"/user/vvlad/2015/*/*/*/EVENTS"

Here is the code of my UDP class:

package my.app.parquet.filters.udp

import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate


import java.lang.{Integer => JInt}

import scala.util.Random

class SampleIntColumn(threshold: Double) extends UserDefinedPredicate[JInt]
with Serializable {
  lazy val random = { new Random() }
  val myThreshold = threshold
  override def keep(value: JInt): Boolean = {
random.nextFloat() < myThreshold
  }

  override def canDrop(statistics: Statistics[JInt]): Boolean = false

  override def inverseCanDrop(statistics: Statistics[JInt]): Boolean = false

  override def toString: String = {
"%s(%f)".format(getClass.getName, myThreshold)
  }
}

Spark app:

package my.app.parquet.filters.tools

import my.app.parquet.filters.udp.SampleIntColumn
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.AvroReadSupport
import org.apache.parquet.filter2.dsl.Dsl.IntColumn
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.spark.{SparkContext, SparkConf}

import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.parquet.filter2.predicate.FilterPredicate


object TestSparkApp {
  def main (args: Array[String]) {
val conf = new SparkConf()
  //"local[2]" or yarn-client etc
  .setMaster(args(0))
  .setAppName("Spark Scala App")
  .set("spark.executor.memory", "1g")
  .set("spark.rdd.compress", "true")
  .set("spark.storage.memoryFraction", "1")

val sc = new SparkContext(conf)

val job = new Job(sc.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job,
classOf[AvroReadSupport[GenericRecord]])

val sampler = new SampleIntColumn(0.05)
val impField = IntColumn("impression")

val pred: FilterPredicate = impField.filterBy(sampler)

ParquetInputFormat.setFilterPredicate(job.getConfiguration, pred)


println(job.getConfiguration.get("parquet.private.read.filter.predicate"))

println(job.getConfiguration.get("parquet.private.read.filter.predicate.human.readable"))

val records1 = sc.newAPIHadoopFile(
//
  args(1),
  classOf[ParquetInputFormat[GenericRecord]],
  classOf[Void],
  classOf[GenericRecord],
  job.getConfiguration
).map(_._2).cache()

println("result count " + records1.count().toString)

sc.stop()
  }
}



Here are logs with exception I'm getting:


15/10/19 11:14:43 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID
0, hdp010, NODE_LOCAL, 2815 bytes)
15/10/19 11:14:43 INFO TaskSetManager: Starting task 14.0 in stage 0.0 (TID
1, hdp042, NODE_LOCAL, 2816 bytes)
15/10/19 11:14:43 INFO YarnClientSchedulerBackend: Registered executor:
AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@hdp027:43593/user/Executor#-832887318])
with ID 3
15/10/19 11:14:43 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
2, hdp027, NODE_LOCAL, 2814 bytes)
15/10/19 11:14:44 INFO BlockManagerMasterEndpoint: Registering block
manager hdp027:64266 with 883.8 MB RAM, BlockManagerId(3,

Re: Problem building Spark

2015-10-19 Thread Ted Yu
See this thread
http://search-hadoop.com/m/q3RTtV3VFNdgNri2=Re+Build+spark+1+5+1+branch+fails

> On Oct 19, 2015, at 6:59 PM, Annabel Melongo 
>  wrote:
> 
> I tried to build Spark according to the build directions and the it failed 
> due to the following error: 
>  
>  
>  
>  
>  
>  
> Building Spark - Spark 1.5.1 Documentation
> Building Spark Building with build/mvn Building a Runnable Distribution 
> Setting up Maven’s Memory Usage Specifying the Hadoop Version Building With 
> Hive and JDBC Support Building for Scala 2.11
> View on spark.apache.org
> Preview by Yahoo
>  
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-assembly-plugin:2.5.5:single (test-jar-with- 
>   dependencies) on project spark-streaming-mqtt_2.10: Failed to 
> create assembly: Error creating assembly archive test-
> jar-with-dependencies: Problem creating jar: Execution exception (and 
> the archive is probably corrupt but I could not 
> delete it): Java heap space -> [Help 1]
> 
> Any help?  I have a 64-bit windows 8 machine


Re: Gradient Descent with large model size

2015-10-19 Thread Mike Hynes
Hi Alexander, Joseph, Evan,

I just wanted to weigh in an empirical result that we've had on a
standalone cluster with 16 nodes and 256 cores.

Typically we run optimization tasks with 256 partitions for 1
partition per core, and find that performance worsens with more
partitions than physical cores in communication operations, which
makes sense; the computational work has to be very high to justify so
many partitions.

However, with this setup, the default of numLevels = 2 in MLlib
methods using treeAggregate is generally a poor choice for large
datasets; it has been empirically far better in our tests to use
numLevels = log_2(16). The difference in clock time per iteration for
iterative optimization jobs can be huge; it takes 1.5--1.6x *less*
time to use more levels in the tree. I never see a difference running
running test suites on a single node for building, but on a large job
across the cluster it's very noticeable.

If there's to be any modifications of treeAggregate, I would recommend
some heuristics that uses numLevels = log_2(numNodes) or something
similar, or have the numLevels be specifiable in the MLlib APIs
instead of defaulting to 2.

Mike


On 10/19/15, Ulanov, Alexander  wrote:
> Evan, Joseph
>
> Thank you for valuable suggestions. It would be great to improve
> TreeAggregate (if possible).
>
> Making less updates would certainly make sense, though that will mean using
> batch gradient such as LBFGS. It seems as today it is the only viable option
> in Spark.
>
> I will also take a look into how to zip the data sent as update. Do you know
> any options except going from double to single precision (or less) ?
>
> Best regards, Alexander
>
> From: Evan Sparks [mailto:evan.spa...@gmail.com]
> Sent: Saturday, October 17, 2015 2:24 PM
> To: Joseph Bradley
> Cc: Ulanov, Alexander; dev@spark.apache.org
> Subject: Re: Gradient Descent with large model size
>
> Yes, remember that your bandwidth is the maximum number of bytes per second
> that can be shipped to the driver. So if you've got 5 blocks that size, then
> it looks like you're basically saturating the network.
>
> Aggregation trees help for many partitions/nodes and butterfly mixing can
> help use all of the network resources. I have seen implementations of
> butterfly mixing in spark but don't know if we've got one in mainline. Zhao
> and Canny's work [1] details this approach in the context of model fitting.
>
> At any rate, for this type of ANN work with huge models in *any* distributed
> setting, you're going to need to get faster networking (most production
> deployments I know of either have 10 gigabit Ethernet or 40 gigabit
> infiniband links), or figure out a way to decrease frequency or density of
> updates. Both would be even better.
>
> [1] http://www.cs.berkeley.edu/~jfc/papers/13/butterflymixing.pdf
>
> On Oct 17, 2015, at 12:47 PM, Joseph Bradley
> > wrote:
> The decrease in running time from N=6 to N=7 makes some sense to me; that
> should be when tree aggregation kicks in.  I'd call it an improvement to run
> in the same ~13sec increasing from N=6 to N=9.
>
> "Does this mean that for 5 nodes with treeaggreate of depth 1 it will take
> 5*3.1~15.5 seconds?"
> --> I would guess so since all of that will be aggregated on the driver, but
> I don't know enough about Spark's shuffling/networking to say for sure.
> Others may be able to help more.
>
> Your numbers do make me wonder if we should examine the structure of the
> tree aggregation more carefully and see if we can improve it.
> https://issues.apache.org/jira/browse/SPARK-11168
>
> Joseph
>
> On Thu, Oct 15, 2015 at 7:01 PM, Ulanov, Alexander
> > wrote:
> Hi Joseph,
>
> There seems to be no improvement if I run it with more partitions or bigger
> depth:
> N = 6 Avg time: 13.49157910868
> N = 7 Avg time: 8.929480508
> N = 8 Avg time: 14.50712347198
> N= 9 Avg time: 13.85487164533
>
> Depth = 3
> N=2 Avg time: 8.85389534633
> N=5 Avg time: 15.99157492467
>
> I also measured the bandwidth of my network with iperf. It shows 247Mbit/s.
> So the transfer of 12M array of double message should take 64 *
> 12M/247M~3.1s. Does this mean that for 5 nodes with treeaggreate of depth 1
> it will take 5*3.1~15.5 seconds?
>
> Best regards, Alexander
> From: Joseph Bradley
> [mailto:jos...@databricks.com]
> Sent: Wednesday, October 14, 2015 11:35 PM
> To: Ulanov, Alexander
> Cc: dev@spark.apache.org
> Subject: Re: Gradient Descent with large model size
>
> For those numbers of partitions, I don't think you'll actually use tree
> aggregation.  The number of partitions needs to be over a certain threshold
> (>= 7) before treeAggregate really operates on a tree structure:
> 

Problem building Spark

2015-10-19 Thread Annabel Melongo
I tried to build Spark according to the build directions and the it failed due 
to the following error: 
|   |
|   |   |   |   |   |
| Building Spark - Spark 1.5.1 DocumentationBuilding Spark Building with 
build/mvn Building a Runnable Distribution Setting up Maven’s Memory Usage 
Specifying the Hadoop Version Building With Hive and JDBC Support Building for 
Scala 2.11  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |

    [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-assembly-plugin:2.5.5:single (test-jar-with-     
      dependencies) on project spark-streaming-mqtt_2.10: Failed to create 
assembly: Error creating assembly archive test-        jar-with-dependencies: 
Problem creating jar: Execution exception (and the archive is probably corrupt 
but I could not         delete it): Java heap space -> [Help 1]
Any help?  I have a 64-bit windows 8 machine


Re: Problem building Spark

2015-10-19 Thread Tathagata Das
Seems to be a heap space issue for Maven. Have you configured Maven's
memory according the instruction on the web page?

export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"


On Mon, Oct 19, 2015 at 6:59 PM, Annabel Melongo <
melongo_anna...@yahoo.com.invalid> wrote:

> I tried to build Spark according to the build directions
>  and the it
> failed due to the following error:
>
>
>
>
>
>
> Building Spark - Spark 1.5.1 Documentation
> 
> Building Spark Building with build/mvn Building a Runnable Distribution
> Setting up Maven’s Memory Usage Specifying the Hadoop Version Building With
> Hive and JDBC Support Building for Scala 2.11
> View on spark.apache.org
> 
> Preview by Yahoo
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-assembly-plugin:2.5.5:single
> (test-jar-with-
>   dependencies) on project spark-streaming-mqtt_2.10: Failed to
> create assembly: Error creating assembly archive test-
> jar-with-dependencies: Problem creating jar: Execution exception
> (and the archive is probably corrupt but I could not
> delete it): Java heap space -> [Help 1]
>
> Any help?  I have a 64-bit windows 8 machine
>