jdbc/save DataFrameWriter implementation change

2016-04-12 Thread Justin.Pihony
Hi,

I have a ticket open on how save should delegate to the jdbc method, however
I went to implement this and it just didn't seem clean. Please take a look
at my comment on https://issues.apache.org/jira/browse/SPARK-14525 and let
me know if you agree with the second approach or not.

Thanks,
Justin



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/jdbc-save-DataFrameWriter-implementation-change-tp17114.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Accessing Secure Hadoop from Mesos cluster

2016-04-12 Thread Tony Kinsley
I have been working towards getting some spark streaming jobs to run in
Mesos cluster mode (using docker containers) and write data periodically to
a secure HDFS cluster. Unfortunately this does not seem to be well
supported currently in spark (
https://issues.apache.org/jira/browse/SPARK-12909). The problem seems to be
that A) passing in a principal and keytab only get processed if the backend
is yarn, B) all the code for renewing tickets is implemented by the yarn
backend.


My first attempt to get around this problem was to create docker containers
that would use a custom entrypoint to run a process manager. Then have cron
running in each container which would periodically run kinit. I was hoping
this would work since the spark can correctly log in if the TGT exists (at
least from my tests manually kinit’ing and running spark in local mode).
However this hack will not work (currently anyways) as the Mesos scheduler
does not specify whether a shell should be used for the command. Mesos will
default to using the shell and then override the entrypoint of the docker
image with /bin/sh (https://issues.apache.org/jira/browse/MESOS-1770).


Since I have not been able to come up with an acceptable work around I am
looking into the possibility of adding the functionality into Spark, but I
wanted to check in to make sure I was not duplicating others work and also
to get some general advice on a good approach to solving this problem. I
have found this old email chain that talks about some different challenges
associated with authenticating correctly to the NameNodes (
http://comments.gmane.org/gmane.comp.lang.scala.spark.user/14257).


I've noticed that the Yarn security settings are namespaced to be specific
to Yarn and that there is some code that seems to be fairly generic
(AMDelegationTokenRenewer.scala and ExecutorDelegationTokenUpdater for
instance although I'm not sure about the use of the YarnSparkHadoopUtils).
It would seem to me that some of this code could be reused across the
various cluster backends. That said, I am fairly new to working with Hadoop
and Spark, and do not claim to understand the inner workings of Yarn or
Mesos, although I feel much more comfortable with Mesos.


I would definitely appreciate some guidance especially since whatever work
that I or ViaSat (my employer) gets working we would definitely be
interested in contributing it back and would very much want to avoid
maintaining a fork of Spark.

Tony


Re: Different maxBins value for categorical and continuous features in RandomForest implementation.

2016-04-12 Thread Joseph Bradley
That sounds useful.  Would you mind creating a JIRA for it?  Thanks!
Joseph

On Mon, Apr 11, 2016 at 2:06 AM, Rahul Tanwani 
wrote:

> Hi,
>
> Currently the RandomForest algo takes a single maxBins value to decide the
> number of splits to take. This sometimes causes training time to go very
> high when there is a single categorical column having sufficiently large
> number of unique values. This single column impacts all the numeric
> (continuous) columns even though such a high number of splits are not
> required.
>
> Encoding the  categorical column into features make the data very wide and
> this requires us to increase the maxMemoryInMB and puts more pressure on
> the
> GC as well.
>
> Keeping the separate maxBins values for categorial and continuous features
> should be useful in this regard.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Different-maxBins-value-for-categorical-and-continuous-features-in-RandomForest-implementation-tp17099.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: Spark on Mesos 0.28 issue

2016-04-12 Thread Timothy Chen
Hi Yang,

Can you share the master log/slave log?

Tim


> On Apr 12, 2016, at 2:05 PM, Yang Lei  wrote:
> 
> I have been able to run spark submission in docker container (HOST network) 
> through Marathon on mesos and target to Mesos cluster (zk address) for at 
> least Spark 1.6, 1.5.2 over Mesos 0.26, 0.27. 
> 
> I do need to define SPARK_PUBLIC_DNS and SPARK_LOCAL_IP so that the spark 
> driver can announce the right IP address.
> 
> However, on Mesos 0.28, the spark framework will fail with "Failed to 
> shutdown socket with fd 54: Transport endpoint is not connected”. Eventually, 
> I got the problem bypassed by defining additional  LIBPROCESS_IP
> 
> Please let me know if the behavior is as expected. If it is,  it will be good 
> to document the requirement on the Spark Mesos cluster website.
> 
> Thank you.
> 
> Yang.


Re: Spark 1.6.1 packages on S3 corrupt?

2016-04-12 Thread Nicholas Chammas
Yes, this is a known issue. The core devs are already aware of it. [CC dev]

FWIW, I believe the Spark 1.6.1 / Hadoop 2.6 package on S3 is not corrupt.
It may be the only 1.6.1 package that is not corrupt, though. :/

Nick


On Tue, Apr 12, 2016 at 9:00 PM Augustus Hong 
wrote:

> Hi all,
>
> I'm trying to launch a cluster with the spark-ec2 script but seeing the
> error below.  Are the packages on S3 corrupted / not in the correct format?
>
> Initializing spark
>
> --2016-04-13 00:25:39--
> http://s3.amazonaws.com/spark-related-packages/spark-1.6.1-bin-hadoop1.tgz
>
> Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.11.67
>
> Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.11.67|:80...
> connected.
>
> HTTP request sent, awaiting response... 200 OK
>
> Length: 277258240 (264M) [application/x-compressed]
>
> Saving to: ‘spark-1.6.1-bin-hadoop1.tgz’
>
> 100%[==>]
> 277,258,240 37.6MB/s   in 9.2s
>
> 2016-04-13 00:25:49 (28.8 MB/s) - ‘spark-1.6.1-bin-hadoop1.tgz’ saved
> [277258240/277258240]
>
> Unpacking Spark
>
>
> gzip: stdin: not in gzip format
>
> tar: Child returned status 1
>
> tar: Error is not recoverable: exiting now
>
> mv: missing destination file operand after `spark'
>
> Try `mv --help' for more information.
>
>
>
>
>
>
> --
> [image: Branch] 
> Augustus Hong
> Software Engineer
>
>


Spark on Mesos 0.28 issue

2016-04-12 Thread Yang Lei
I have been able to run spark submission in docker container (HOST network) 
through Marathon on mesos and target to Mesos cluster (zk address) for at least 
Spark 1.6, 1.5.2 over Mesos 0.26, 0.27. 

I do need to define SPARK_PUBLIC_DNS and SPARK_LOCAL_IP so that the spark 
driver can announce the right IP address.

However, on Mesos 0.28, the spark framework will fail with "Failed to shutdown 
socket with fd 54: Transport endpoint is not connected 
”.
 Eventually, I got the problem bypassed by defining additional  LIBPROCESS_IP

Please let me know if the behavior is as expected. If it is,  it will be good 
to document the requirement on the Spark Mesos cluster website.

Thank you.

Yang.

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-12 Thread Herman van Hövell tot Westerflier
I am not sure if you can push a limit through a join. This becomes
problematic if not all keys are present on both sides; in such a case a
limit can produce fewer rows than the set limit.

This might be a rare case in which whole stage codegen is slower, due to
the fact that we need to buffer the result of such a stage. You could try
to disable it by setting "spark.sql.codegen.wholeStage" to false.

2016-04-12 14:32 GMT+02:00 Rajesh Balamohan :

> Hi,
>
> I ran the following query in spark (latest master codebase) and it took a
> lot of time to complete even though it was a broadcast hash join.
>
> It appears that limit computation is done only after computing complete
> join condition.  Shouldn't the limit condition be pushed to
> BroadcastHashJoin (wherein it would have to stop processing after
> generating 10 rows?).  Please let me know if my understanding on this is
> wrong.
>
>
> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit
> 10;
>
> 
> | == Physical Plan ==
> CollectLimit 10
> +- WholeStageCodegen
>:  +- Project [l_partkey#893]
>: +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
> BuildRight, None
>::- Project [l_partkey#893]
>::  +- Filter isnotnull(l_partkey#893)
>:: +- Scan HadoopFiles[l_partkey#893] Format: ORC,
> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct
>:+- INPUT
>+- BroadcastExchange
> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
> bigint)),List(ps_partkey#908))
>   +- WholeStageCodegen
>  :  +- Project [ps_partkey#908]
>  : +- Filter isnotnull(ps_partkey#908)
>  :+- Scan HadoopFiles[ps_partkey#908] Format: ORC,
> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct
>  |
> 
>
>
>
>
> --
> ~Rajesh.B
>


SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-12 Thread Rajesh Balamohan
Hi,

I ran the following query in spark (latest master codebase) and it took a
lot of time to complete even though it was a broadcast hash join.

It appears that limit computation is done only after computing complete
join condition.  Shouldn't the limit condition be pushed to
BroadcastHashJoin (wherein it would have to stop processing after
generating 10 rows?).  Please let me know if my understanding on this is
wrong.


select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit
10;


| == Physical Plan ==
CollectLimit 10
+- WholeStageCodegen
   :  +- Project [l_partkey#893]
   : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
BuildRight, None
   ::- Project [l_partkey#893]
   ::  +- Filter isnotnull(l_partkey#893)
   :: +- Scan HadoopFiles[l_partkey#893] Format: ORC,
PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct
   :+- INPUT
   +- BroadcastExchange
HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
bigint)),List(ps_partkey#908))
  +- WholeStageCodegen
 :  +- Project [ps_partkey#908]
 : +- Filter isnotnull(ps_partkey#908)
 :+- Scan HadoopFiles[ps_partkey#908] Format: ORC,
PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct
 |





-- 
~Rajesh.B


Possible deadlock in registering applications in the recovery mode

2016-04-12 Thread Niranda Perera
Hi all,

I have encountered a small issue in the standalone recovery mode.

Let's say there was an application A running in the cluster. Due to some
issue, the entire cluster, together with the application A goes down.

Then later on, cluster comes back online, and the master then goes into the
'recovering' mode, because it sees some apps, workers and drivers have
already been in the cluster from Persistence Engine. While in the recovery
process, the application comes back online, but now it would have a
different ID, let's say B.

But then, as per the master, application registration logic, this
application B will NOT be added to the 'waitingApps' with the message
""Attempted to re-register application at same address". [1]

  private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
  logInfo("Attempted to re-register application at same address: " +
appAddress)
  return
}


The problem here is, master is trying to recover application A, which is
not in there anymore. Therefore after the recovery process, app A will be
dropped. However app A's successor, app B was also omitted from the
'waitingApps' list because it had the same address as App A previously.

This creates a deadlock in the cluster, app A nor app B is available in the
cluster.

When the master is in the RECOVERING mode, shouldn't it add all the
registering apps to a list first, and then after the recovery is completed
(once the unsuccessful recoveries are removed), deploy the apps which are
new?

This would sort this deadlock IMO?

look forward to hearing from you.

best

[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L834

-- 
Niranda
@n1r44 
+94-71-554-8430
https://pythagoreanscript.wordpress.com/