Re: What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-13 Thread Nick Pentreath
I should point out that I'm not sure what the performance of that project is.




I'd expect that native data frame in PySpark will be significantly more 
efficient than their DictRDD. 




It would be interesting to see a performance comparison for the pipelines 
relative to native Spark ML pipelines, if you do test both out.



—
Sent from Mailbox

On Sat, Sep 12, 2015 at 10:52 PM, Rex X  wrote:

> Jorn and Nick,
> Thanks for answering.
> Nick, the sparkit-learn project looks interesting. Thanks for mentioning it.
> Rex
> On Sat, Sep 12, 2015 at 12:05 PM, Nick Pentreath 
> wrote:
>> You might want to check out https://github.com/lensacom/sparkit-learn
>> 
>>
>> Though it's true for random
>> Forests / trees you will need to use MLlib
>>
>> —
>> Sent from Mailbox 
>>
>>
>> On Sat, Sep 12, 2015 at 9:00 PM, Jörn Franke  wrote:
>>
>>> I fear you have to do the plumbing all yourself. This is the same for all
>>> commercial and non-commercial libraries/analytics packages. It often also
>>> depends on the functional requirements on how you distribute.
>>>
>>> Le sam. 12 sept. 2015 à 20:18, Rex X  a écrit :
>>>
 Hi everyone,

 What is the best way to migrate existing scikit-learn code to PySpark
 cluster? Then we can bring together the full power of both scikit-learn and
 spark, to do scalable machine learning. (I know we have MLlib. But the
 existing code base is big, and some functions are not fully supported yet.)

 Currently I use multiprocessing module of Python to boost the speed. But
 this only works for one node, while the data set is small.

 For many real cases, we may need to deal with gigabytes or even
 terabytes of data, with thousands of raw categorical attributes, which can
 lead to millions of discrete features, using 1-of-k representation.

 For these cases, one solution is to use distributed memory. That's why I
 am considering spark. And spark support Python!
 With Pyspark, we can import scikit-learn.

 But the question is how to make the scikit-learn code, decisionTree
 classifier for example, running in distributed computing mode, to benefit
 the power of Spark?


 Best,
 Rex

>>>
>>

Re: Data lost in spark streaming

2015-09-13 Thread Tathagata Das
Maybe the driver got restarted. See the log4j logs of the driver before it
restarted.

On Thu, Sep 10, 2015 at 11:32 PM, Bin Wang  wrote:

> I'm using spark streaming 1.4.0 and have a DStream that have all the data
> it received. But today the history data in the DStream seems to be lost
> suddenly. And the application UI also lost the streaming process time and
> all the related data. Could any give some hint to debug this? Thanks.
>
>
>


How to Hive UDF in Spark DataFrame?

2015-09-13 Thread unk1102
Hi I am using UDF in  hiveContext.sql("") query inside it uses group by which
forces huge data shuffle read of around 30 GB I am thinking to convert above
query into DataFrame so that I avoid using group by.

How do we use Hive UDF in Spark DataFrame? Please guide. Thanks much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Hive-UDF-in-Spark-DataFrame-tp24676.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: selecting columns with the same name in a join

2015-09-13 Thread Evert Lammerts
Thanks Michael, we'll update then.

Evert
On Sep 11, 2015 20:59, "Michael Armbrust"  wrote:

> Here is what I get on branch-1.5:
>
> x = sc.parallelize([dict(k=1, v="Evert"), dict(k=2, v="Erik")]).toDF()
> y = sc.parallelize([dict(k=1, v="Ruud"), dict(k=3, v="Vincent")]).toDF()
> x.registerTempTable('x')
> y.registerTempTable('y')
> sqlContext.sql("select y.v, x.v FROM x INNER JOIN y ON x.k=y.k").collect()
>
> Out[1]: [Row(v=u'Ruud', v=u'Evert')]
>
> On Fri, Sep 11, 2015 at 3:14 AM, Evert Lammerts 
> wrote:
>
>> Am I overlooking something? This doesn't seem right:
>>
>> x = sc.parallelize([dict(k=1, v="Evert"), dict(k=2, v="Erik")]).toDF()
>> y = sc.parallelize([dict(k=1, v="Ruud"), dict(k=3, v="Vincent")]).toDF()
>> x.registerTempTable('x')
>> y.registerTempTable('y')
>> sqlContext.sql("select y.v, x.v FROM x INNER JOIN y ON x.k=y.k").collect()
>>
>> Out[26]: [Row(v=u'Evert', v=u'Evert')]
>>
>> May just be because I'm behind; I'm on:
>>
>> Spark 1.5.0-SNAPSHOT (git revision 27ef854) built for Hadoop 2.6.0 Build
>> flags: -Pyarn -Psparkr -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive
>> -Phive-thriftserver -DskipTests
>>
>> Can somebody check whether the above code does work on the latest release?
>>
>> Thanks!
>> Evert
>>
>
>


Re: [Question] ORC - EMRFS Problem

2015-09-13 Thread Cazen Lee
Hi Owen Thank you for reply

I heard that some peoples say ORC is Owen’s RC file haha ;)

And, Some peoples tells to me after posting it’s already known issues about AWS 
EMR 4.0.0

They said that it might be Hive 0.13.1 and Spark 1.4.1 compatibility issue

So AWS will launch EMR 4.1.0 in couple of weeks with Spark 1.5 and higher 
version of Hive

I hope it works properly after 4.1.0

Error log is below, but don’t mind about that

Thank you very much



scala> val ORCFile = 
sqlContext.read.format("orc").load("s3n://S3bucketName/S3serviceCode/yymmdd=20150801/country=eu/75e91844-2a87-4d8f-af9f-9268e34daef6-00")
 



2015-09-13 07:33:29,228 INFO  [main] fs.EmrFileSystem 
(EmrFileSystem.java:initialize(107)) - Consistency disabled, using 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation 
2015-09-13 07:33:29,314 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], 
ServiceName=[Amazon S3], AWSRequestID=[CF49E1372BEF2E81], 
ServiceEndpoint=[https://S3bucketName.s3.amazonaws.com 
], HttpClientPoolLeasedCount=0, 
RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=0, 
ClientExecuteTime=[85.608], HttpRequestTime=[85.101], 
HttpClientReceiveResponseTime=[13.891], RequestSigningTime=[0.259], 
ResponseProcessingTime=[0.007], HttpClientSendRequestTime=[0.305], 
2015-09-13 07:33:29,351 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], 
ServiceName=[Amazon S3], AWSRequestID=[55B8C5E6009F0246], 
ServiceEndpoint=[https://S3bucketName.s3.amazonaws.com 
], HttpClientPoolLeasedCount=0, 
RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, 
ClientExecuteTime=[32.776], HttpRequestTime=[13.17], 
HttpClientReceiveResponseTime=[10.961], RequestSigningTime=[0.28], 
ResponseProcessingTime=[19.042], HttpClientSendRequestTime=[0.295], 
2015-09-13 07:33:29,421 INFO  [main] s3n.S3NativeFileSystem 
(S3NativeFileSystem.java:open(1159)) - Opening 
's3n://S3bucketName/S3serviceCode/yymmdd=20150801/country=eu/75e91844-2a87-4d8f-af9f-9268e34daef6-00'
 for reading 
2015-09-13 07:33:29,477 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[206], 
ServiceName=[Amazon S3], AWSRequestID=[F698A6A43297754E], 
ServiceEndpoint=[https://S3bucketName.s3.amazonaws.com 
], HttpClientPoolLeasedCount=0, 
RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, 
ClientExecuteTime=[53.698], HttpRequestTime=[50.815], 
HttpClientReceiveResponseTime=[48.774], RequestSigningTime=[0.372], 
ResponseProcessingTime=[0.861], HttpClientSendRequestTime=[0.362], 
2015-09-13 07:33:29,478 INFO  [main] metrics.MetricsSaver 
(MetricsSaver.java:(915)) - Thread 1 created MetricsLockFreeSaver 1 
2015-09-13 07:33:29,479 INFO  [main] s3n.S3NativeFileSystem 
(S3NativeFileSystem.java:retrievePair(292)) - Stream for key 
'S3serviceCode/yymmdd=20150801/country=eu/75e91844-2a87-4d8f-af9f-9268e34daef6-00'
 seeking to position '217260502' 
2015-09-13 07:33:29,590 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[206], 
ServiceName=[Amazon S3], AWSRequestID=[AD631A8AE229AFE7], 
ServiceEndpoint=[https://S3bucketName.s3.amazonaws.com 
], HttpClientPoolLeasedCount=0, 
RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=0, 
ClientExecuteTime=[109.859], HttpRequestTime=[109.204], 
HttpClientReceiveResponseTime=[58.468], RequestSigningTime=[0.286], 
ResponseProcessingTime=[0.133], HttpClientSendRequestTime=[0.327], 
2015-09-13 07:33:29,753 INFO  [main] s3n.S3NativeFileSystem 
(S3NativeFileSystem.java:listStatus(896)) - listStatus 
s3n://S3bucketName/S3serviceCode/yymmdd=20150801/country=eu/75e91844-2a87-4d8f-af9f-9268e34daef6-00
 with recursive false 
2015-09-13 07:33:29,877 INFO  [main] hive.HiveContext 
(Logging.scala:logInfo(59)) - Initializing HiveMetastoreConnection version 
0.13.1 using Spark classes. 
2015-09-13 07:33:30,593 WARN  [main] util.NativeCodeLoader 
(NativeCodeLoader.java:(62)) - Unable to load native-hadoop library for 
your platform... using builtin-java classes where applicable 
2015-09-13 07:33:30,622 INFO  [main] metastore.HiveMetaStore 
(HiveMetaStore.java:newRawStore(493)) - 0: Opening raw store with implemenation 
class:org.apache.hadoop.hive.metastore.ObjectStore 
2015-09-13 07:33:30,641 INFO  [main] metastore.ObjectStore 
(ObjectStore.java:initialize(246)) - ObjectStore, initialize called 
2015-09-13 07:33:30,782 INFO  [main] DataNucleus.Persistence 
(Log4JLogger.java:info(77)) - Property datanucleus.cache.level2 unknown - will 
be ignored 
2015-09-13 07:33:30,782 INFO  [main] DataNucleus.Persistence 
(Log4JLogger.java:info(77)) - Property hive.metastore.integral.jdo.pushdown 
unknown - will be ignored 

Stopping SparkContext and HiveContext

2015-09-13 Thread Ophir Cohen
Hi,
I'm working on my companie's system that constructs out of Spark, Zeppelin,
Hive and some other technology and wonder regarding to ability to stop
contexts.

Working on the test framwork for the system, when run tests someting I
would like to create new SparkContext in order to run the tests on 'clean'
context.
I found it hard to do as, first of all, I couldn't find any way to
understand of SparkContext is already stopped. It has private flag for that
but its private.
Anther problem is that when creating local HiveContext it initialize derby
instance. when trying to create new HiveContext it fails cause the DB
already exists.
Apperantly, there isn't anyway to tell HiveContext to stop and clear its
connection to the DB.

Essintelly I'm looking for two things:
1. Way to understand if SparkContext stopped already or not.
2. Way to stop/close HiveContext that will close relevant files/connection
and release the resources.

Thanks,
Ophir


Re: [Question] ORC - EMRFS Problem

2015-09-13 Thread Owen O'Malley
Do you have a stack trace of the array out of bounds exception? I don't
remember an array out of bounds problem off the top of my head. A stack
trace will tell me a lot, obviously.

If you are using Spark 1.4 that implies Hive 0.13, which is pretty old. It
may be a problem that we fixed a while ago.

Thanks,
   Owen



On Sat, Sep 12, 2015 at 8:15 AM, Cazen Lee  wrote:

> Good Day!
>
> I think there are some problems between ORC and AWS EMRFS.
>
> When I was trying to read "upper 150M" ORC files from S3, ArrayOutOfIndex
> Exception occured.
>
> I'm sure that it's AWS side issue because there was no exception when
> trying from HDFS or S3NativeFileSystem.
>
> Parquet runs ordinarily but it's inconvenience(Almost our system runs
> based on ORC)
>
> Does anybody knows about this issue?
>
> I've tried spark 1.4.1(EMR 4.0.0) and there are no 1.5 patch-note about
> this
>
> Thank You
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [Question] ORC - EMRFS Problem

2015-09-13 Thread Cazen
Stacktrace are below.

But someone told me that it's known issue and will be patched in couple of
weeks(EMR 4.1.)

So, dont' mind about that. I'll waiting until patched.







scala> val ORCFile =
sqlContext.read.format("orc").load("s3n://S3bucketName/S3serviceCode/yymmdd=20150801/country=eu/75e91844-2a87-4d8f-af9f-9268e34daef6-00")



2015-09-13 07:33:29,228 INFO  [main] fs.EmrFileSystem
(EmrFileSystem.java:initialize(107)) - Consistency disabled, using
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem
implementation
2015-09-13 07:33:29,314 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
ServiceName=[Amazon S3], AWSRequestID=[CF49E1372BEF2E81],
ServiceEndpoint=[https://S3bucketName.s3.amazonaws.com],
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=0, ClientExecuteTime=[85.608],
HttpRequestTime=[85.101], HttpClientReceiveResponseTime=[13.891],
RequestSigningTime=[0.259], ResponseProcessingTime=[0.007],
HttpClientSendRequestTime=[0.305],
2015-09-13 07:33:29,351 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
ServiceName=[Amazon S3], AWSRequestID=[55B8C5E6009F0246],
ServiceEndpoint=[https://S3bucketName.s3.amazonaws.com],
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[32.776],
HttpRequestTime=[13.17], HttpClientReceiveResponseTime=[10.961],
RequestSigningTime=[0.28], ResponseProcessingTime=[19.042],
HttpClientSendRequestTime=[0.295],
2015-09-13 07:33:29,421 INFO  [main] s3n.S3NativeFileSystem
(S3NativeFileSystem.java:open(1159)) - Opening
's3n://S3bucketName/S3serviceCode/yymmdd=20150801/country=eu/75e91844-2a87-4d8f-af9f-9268e34daef6-00'
for reading
2015-09-13 07:33:29,477 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[206],
ServiceName=[Amazon S3], AWSRequestID=[F698A6A43297754E],
ServiceEndpoint=[https://S3bucketName.s3.amazonaws.com],
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[53.698],
HttpRequestTime=[50.815], HttpClientReceiveResponseTime=[48.774],
RequestSigningTime=[0.372], ResponseProcessingTime=[0.861],
HttpClientSendRequestTime=[0.362],
2015-09-13 07:33:29,478 INFO  [main] metrics.MetricsSaver
(MetricsSaver.java:(915)) - Thread 1 created MetricsLockFreeSaver 1
2015-09-13 07:33:29,479 INFO  [main] s3n.S3NativeFileSystem
(S3NativeFileSystem.java:retrievePair(292)) - Stream for key
'S3serviceCode/yymmdd=20150801/country=eu/75e91844-2a87-4d8f-af9f-9268e34daef6-00'
seeking to position '217260502'
2015-09-13 07:33:29,590 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[206],
ServiceName=[Amazon S3], AWSRequestID=[AD631A8AE229AFE7],
ServiceEndpoint=[https://S3bucketName.s3.amazonaws.com],
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=0, ClientExecuteTime=[109.859],
HttpRequestTime=[109.204], HttpClientReceiveResponseTime=[58.468],
RequestSigningTime=[0.286], ResponseProcessingTime=[0.133],
HttpClientSendRequestTime=[0.327],
2015-09-13 07:33:29,753 INFO  [main] s3n.S3NativeFileSystem
(S3NativeFileSystem.java:listStatus(896)) - listStatus
s3n://S3bucketName/S3serviceCode/yymmdd=20150801/country=eu/75e91844-2a87-4d8f-af9f-9268e34daef6-00
with recursive false
2015-09-13 07:33:29,877 INFO  [main] hive.HiveContext
(Logging.scala:logInfo(59)) - Initializing HiveMetastoreConnection version
0.13.1 using Spark classes.
2015-09-13 07:33:30,593 WARN  [main] util.NativeCodeLoader
(NativeCodeLoader.java:(62)) - Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
2015-09-13 07:33:30,622 INFO  [main] metastore.HiveMetaStore
(HiveMetaStore.java:newRawStore(493)) - 0: Opening raw store with
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
2015-09-13 07:33:30,641 INFO  [main] metastore.ObjectStore
(ObjectStore.java:initialize(246)) - ObjectStore, initialize called
2015-09-13 07:33:30,782 INFO  [main] DataNucleus.Persistence
(Log4JLogger.java:info(77)) - Property datanucleus.cache.level2 unknown -
will be ignored
2015-09-13 07:33:30,782 INFO  [main] DataNucleus.Persistence
(Log4JLogger.java:info(77)) - Property hive.metastore.integral.jdo.pushdown
unknown - will be ignored
2015-09-13 07:33:31,208 INFO  [main] metastore.ObjectStore
(ObjectStore.java:getPMF(315)) - Setting MetaStore object pin classes with
hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
2015-09-13 07:33:32,375 INFO  [main] DataNucleus.Datastore
(Log4JLogger.java:info(77)) - The class
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
"embedded-only" so does not have its own datastore table.
2015-09-13 07:33:32,376 INFO  [main] 

Re: What happens when cache is full?

2015-09-13 Thread Mailinglisten
Hello Jeff,

I'm quite new in the Spark topic but as far as I understood caching Spark uses 
the available memory and if more memory is requested cached RDDs are thrown 
away in a LRU manner and will be recomputed when needed. Please correct me if 
I'm wrong 

Regards Lars 

> Am 13.09.2015 um 05:14 schrieb Hemminger Jeff :
> 
> I am trying to understand the process of caching and specifically what the 
> behavior is when the cache is full. Please excuse me if this question is a 
> little vague, I am trying to build my understanding of this process.
> 
> I have an RDD that I perform several computations with, I persist it with 
> IN_MEMORY_SER before performing the computations.
> 
> I believe that, due to insufficient memory, it is recomputing (at least part 
> of) the RDD each time.
> 
> Logging shows that the RDD was not cached previously, and therefore needs to 
> be computed.
> 
> I looked at the BlockManager Spark code, and see that getOrCompute attempts 
> to retrieve memory from cache. If it is not available, it computes it.
> 
> Can I assume that when Spark attempts to cache an RDD but runs out of memory, 
> it recomputes a part of the RDD each time it is read?
> 
> I think I might be incorrect in this assumption, because I would have 
> expected a warning message if the cache was out of memory.
> 
> Thanks,
> Jeff

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: UDAF and UDT with SparkSQL 1.5.0

2015-09-13 Thread jussipekkap
A small update: I was able to find a solution with good performance - using
brickhouse collect (Hive UDAF). This also accept structs as an input, which
is an ok workaround, but not perfect still (support for UDTs would be
better). The built-in hive 'collect_list' seems to have a check for input
parameters, ensuring they are of primitive types, which caused the issue I
reported earlier. UDTs as values still throw scala.MatchError (see
stacktrace below).

Anyway, it would seem that adding support for aggregating all values into
collections - without compromising the performance - to Spark SQL UDAFs
would be a great improvement.

-JP

createpoint is a UDF creating an UDT out of coordinate pair. The collect
function should collect the points into an array, ready for processing
further with different geographical functions.

15/09/13 12:38:13 INFO parse.ParseDriver: Parsing command: select
collect(pt) from (SELECT createpoint(lat,lon) as pt from gpx) t2
15/09/13 12:38:13 INFO parse.ParseDriver: Parse Completed
Exception in thread "main" scala.MatchError:
com.eaglepeaks.engine.GeomUDT@7a8e35d1 (of class
com.eaglepeaks.engine.GeomUDT)
at
org.apache.spark.sql.hive.HiveInspectors$class.toInspector(HiveInspectors.scala:618)
at
org.apache.spark.sql.hive.HiveGenericUDAF.toInspector(hiveUDFs.scala:445)
at
org.apache.spark.sql.hive.HiveInspectors$class.toInspector(HiveInspectors.scala:710)
at
org.apache.spark.sql.hive.HiveGenericUDAF.toInspector(hiveUDFs.scala:445)
at
org.apache.spark.sql.hive.HiveGenericUDAF$$anonfun$inspectors$1.apply(hiveUDFs.scala:463)
at
org.apache.spark.sql.hive.HiveGenericUDAF$$anonfun$inspectors$1.apply(hiveUDFs.scala:463)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.hive.HiveGenericUDAF.inspectors$lzycompute(hiveUDFs.scala:463)
at 
org.apache.spark.sql.hive.HiveGenericUDAF.inspectors(hiveUDFs.scala:463)
at
org.apache.spark.sql.hive.HiveGenericUDAF.objectInspector$lzycompute(hiveUDFs.scala:457)
at
org.apache.spark.sql.hive.HiveGenericUDAF.objectInspector(hiveUDFs.scala:456)
at 
org.apache.spark.sql.hive.HiveGenericUDAF.dataType(hiveUDFs.scala:465)
at
org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:140)
at
org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:223)
at
org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:223)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.catalyst.plans.logical.Aggregate.output(basicOperators.scala:223)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$PullOutNondeterministic$$anonfun$apply$17.applyOrElse(Analyzer.scala:962)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$PullOutNondeterministic$$anonfun$apply$17.applyOrElse(Analyzer.scala:954)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$PullOutNondeterministic$.apply(Analyzer.scala:954)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$PullOutNondeterministic$.apply(Analyzer.scala:953)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at

Re: Data lost in spark streaming

2015-09-13 Thread Bin Wang
There is some error logs in the executor and I don't know if it is related:

15/09/11 10:54:05 WARN ipc.Client: Exception encountered while connecting
to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv
alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01
15/09/11 10:54:05 WARN yarn.ApplicationMaster: Reporter thread fails 4
time(s) in a row.
org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
AMRMToken from appattempt_1440495451668_0258_01
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.allocate(Unknown Source)
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:278)
at
org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:174)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:323)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
Invalid AMRMToken from appattempt_1440495451668_0258_01
at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy21.allocate(Unknown Source)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
... 9 more

...

15/09/11 10:54:10 WARN ipc.Client: Exception encountered while connecting
to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv
alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01
15/09/11 10:54:10 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 12, (reason: Exception was thrown 5 time(s) from Reporter thread.)
15/09/11 10:54:10 INFO streaming.StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook
15/09/11 10:54:10 INFO scheduler.ReceiverTracker: Sent stop signal to all 1
receivers
15/09/11 10:54:12 ERROR scheduler.ReceiverTracker: Deregistered receiver
for stream 0: Stopped by driver

Tathagata Das 于2015年9月13日周日 下午4:05写道:

> Maybe the driver got restarted. See the log4j logs of the driver before it
> restarted.
>
> On Thu, Sep 10, 2015 at 11:32 PM, Bin Wang  wrote:
>
>> I'm using spark streaming 1.4.0 and have a DStream that have all the data
>> it received. But today the history data in the DStream seems to be lost
>> suddenly. And the application UI also lost the streaming process time and
>> all the related data. Could any give some hint to debug this? Thanks.
>>
>>
>>
>


Re: Stopping SparkContext and HiveContext

2015-09-13 Thread Ted Yu
For #1, there is the following method:

  @DeveloperApi
  def getExecutorStorageStatus: Array[StorageStatus] = {
assertNotStopped()

You can wrap the call in try block catching IllegalStateException.
Of course, this is just a workaround.

FYI

On Sun, Sep 13, 2015 at 1:48 AM, Ophir Cohen  wrote:

> Hi,
> I'm working on my companie's system that constructs out of Spark,
> Zeppelin, Hive and some other technology and wonder regarding to ability to
> stop contexts.
>
> Working on the test framwork for the system, when run tests someting I
> would like to create new SparkContext in order to run the tests on 'clean'
> context.
> I found it hard to do as, first of all, I couldn't find any way to
> understand of SparkContext is already stopped. It has private flag for that
> but its private.
> Anther problem is that when creating local HiveContext it initialize derby
> instance. when trying to create new HiveContext it fails cause the DB
> already exists.
> Apperantly, there isn't anyway to tell HiveContext to stop and clear its
> connection to the DB.
>
> Essintelly I'm looking for two things:
> 1. Way to understand if SparkContext stopped already or not.
> 2. Way to stop/close HiveContext that will close relevant files/connection
> and release the resources.
>
> Thanks,
> Ophir
>


Re: Data lost in spark streaming

2015-09-13 Thread Ted Yu
Can you retrieve log for appattempt_1440495451668_0258_01 and see if
there is some clue there ?

Cheers

On Sun, Sep 13, 2015 at 3:28 AM, Bin Wang  wrote:

> There is some error logs in the executor and I don't know if it is related:
>
> 15/09/11 10:54:05 WARN ipc.Client: Exception encountered while connecting
> to the server :
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv
> alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01
> 15/09/11 10:54:05 WARN yarn.ApplicationMaster: Reporter thread fails 4
> time(s) in a row.
> org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> AMRMToken from appattempt_1440495451668_0258_01
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
> at
> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
> at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy22.allocate(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:278)
> at
> org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:174)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:323)
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1440495451668_0258_01
> at org.apache.hadoop.ipc.Client.call(Client.java:1468)
> at org.apache.hadoop.ipc.Client.call(Client.java:1399)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> at com.sun.proxy.$Proxy21.allocate(Unknown Source)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
> ... 9 more
>
> ...
>
> 15/09/11 10:54:10 WARN ipc.Client: Exception encountered while connecting
> to the server :
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv
> alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01
> 15/09/11 10:54:10 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 12, (reason: Exception was thrown 5 time(s) from Reporter thread.)
> 15/09/11 10:54:10 INFO streaming.StreamingContext: Invoking
> stop(stopGracefully=false) from shutdown hook
> 15/09/11 10:54:10 INFO scheduler.ReceiverTracker: Sent stop signal to all
> 1 receivers
> 15/09/11 10:54:12 ERROR scheduler.ReceiverTracker: Deregistered receiver
> for stream 0: Stopped by driver
>
> Tathagata Das 于2015年9月13日周日 下午4:05写道:
>
>> Maybe the driver got restarted. See the log4j logs of the driver before
>> it restarted.
>>
>> On Thu, Sep 10, 2015 at 11:32 PM, Bin Wang  wrote:
>>
>>> I'm using spark streaming 1.4.0 and have a DStream that have all the
>>> data it received. But today the history data in the DStream seems to be
>>> lost suddenly. And the application UI also lost the streaming process time
>>> and all the related data. Could any give some hint to debug this? Thanks.
>>>
>>>
>>>
>>


Re: change the spark version

2015-09-13 Thread Steve Loughran

> On 12 Sep 2015, at 09:14, Sean Owen  wrote:
> 
> This is a question for the CDH list. CDH 5.4 has Spark 1.3, and 5.5
> has 1.5. The best thing is to update CDH as a whole if you can.
> 
> However it's pretty simple to just run a newer Spark assembly as a
> YARN app. Don't remove anything in the CDH installation. Try
> downloading the assembly and prefixing your command with
> SPARK_JAR=newassembly.jar ...
> 

...though you should try to make sure the the hadoop JARs in the spark JAR are 
equal to or less than the version of Hadoop you run on, i.e. don't try running 
a Hadoop-2.6 built artifact against a 2.4 based cluster.

One way to guarantee that is to rebuild the spark binaries with the cloudera 
versions & repo in the build

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parquet partitioning performance issue

2015-09-13 Thread sonal sharma
Hi Team,

We have scheduled jobs that read new records from MySQL database every hour
and write (append) them to parquet. For each append operation, spark
creates 10 new partitions in parquet file.

Some of these partitions are fairly small in size (20-40 KB) leading to
high number of smaller partitions and affecting the overall read
performance.

Is there any way in which we can configure spark to merge smaller
partitions into a bigger one to avoid too many partitions? Or can we define
a configuration in Parquet to set a minimum partition size, say 64 MB?

Coalesce/repartition will not work for us as we have highly variable
activity on the database during peak and non-peak hours.

Regards,
Sonal


Re: Parquet partitioning performance issue

2015-09-13 Thread Dean Wampler
One general technique is perform a second pass later over the files, for
example the next day or once a week, to concatenate smaller files into
larger ones. This can be done for all file types and allows you make recent
data available to analysis tools, while avoiding a large build up of small
files overall.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Sun, Sep 13, 2015 at 12:54 PM, sonal sharma 
wrote:

> Hi Team,
>
> We have scheduled jobs that read new records from MySQL database every
> hour and write (append) them to parquet. For each append operation, spark
> creates 10 new partitions in parquet file.
>
> Some of these partitions are fairly small in size (20-40 KB) leading to
> high number of smaller partitions and affecting the overall read
> performance.
>
> Is there any way in which we can configure spark to merge smaller
> partitions into a bigger one to avoid too many partitions? Or can we define
> a configuration in Parquet to set a minimum partition size, say 64 MB?
>
> Coalesce/repartition will not work for us as we have highly variable
> activity on the database during peak and non-peak hours.
>
> Regards,
> Sonal
>


Re: RDD transformation and action running out of memory

2015-09-13 Thread Utkarsh Sengar
Yup, that was the problem.
Changing the default " mongo.input.split_size" from 8MB to 100MB did the
trick.

Config reference:
https://github.com/mongodb/mongo-hadoop/wiki/Configuration-Reference

Thanks!

On Sat, Sep 12, 2015 at 3:15 PM, Richard Eggert 
wrote:

> Hmm... The count() method invokes this:
>
> def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
> = {
>runJob(rdd, func, 0 until rdd.partitions.length)
> }
>
> It appears that you're running out of memory while trying to compute
> (within the driver) the number of partitions that will be in the final
> result. It seems as if Mongo is computing so many splits that you're
> running out of memory.
>
> Looking at your log messages, I see this:
> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
> : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"}
>
> 0x54e64d646d0bfe0a24ba79e1 - 0x54e64d626d0bfe0a24ba79b3 =
> 0x2002e) = 36893488147419103278
>
> The last split reported in the log has max 55adf841b4d2970fb07d7288.
>
> 0x55adf841b4d2970fb07d7288 - 0x54e64d646d0bfe0a24ba79e1 =
> 0xc7aadd47c699058bc2f8a7 = 241383122307828806444054695
>
> 241383122307828806444054695/36893488147419103278 = 6,542,702 potential
> splits, assuming they are evenly distributed. I'm not sure how big each
> split object is, but it's plausible that the process of creating an array
> of 6.5 million of them is causing you to run out of memory.
>
> I think the reason you don't see anything in the executor logs is that the
> exception is occurring before the work is tasked to the executors.
>
>
> Rich
>
>
>
> On Sat, Sep 12, 2015 at 5:18 PM, Utkarsh Sengar 
> wrote:
>
>> I am trying to run this, a basic mapToPair and then count() to trigger an
>> action.
>> 4 executors are launched but I don't see any relevant logs on those
>> executors.
>>
>> It looks like the the driver is pulling all the data and it runs out of
>> memory, the dataset is big, so it won't fit on 1 machine.
>>
>> So what is the issue here? I am using spark in a wrong way in this
>> example?
>>
>> Configuration mongodbConfigInventoryDay = new Configuration();
>> mongodbConfigInventoryDay.set("mongo.job.input.format",
>> "com.mongodb.hadoop.MongoInputFormat");
>> mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" +
>> props.getProperty("mongo") + ":27017/A.MyColl");
>> JavaPairRDD MyColl = sc.newAPIHadoopRDD(
>> mongodbConfigInventoryDay,
>> MongoInputFormat.class,
>> Object.class,
>> BSONObject.class
>> );
>> JavaPairRDD myCollRdd = myColl.mapToPair(tuple2 -> {
>> ObjectMapper mapper = new ObjectMapper();
>> tuple2._2().removeField("_id");
>> MyColl day = mapper.readValue(tuple2._2().toMap().toString(),
>> MyColl.class);
>> return new Tuple2<>(Long.valueOf((String)
>> tuple2._2().get("MyCollId")), day);
>> });
>>
>> myCollRdd.count();
>>
>>
>> Logs on the driver:
>> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with
>> curMem=253374, maxMem=278019440
>> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in
>> memory (estimated size 117.8 KB, free 264.8 MB)
>> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with
>> curMem=374038, maxMem=278019440
>> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
>> bytes in memory (estimated size 12.5 KB, free 264.8 MB)
>> 15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>> memory on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB)
>> 15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from
>> newAPIHadoopRDD at SparkRunner.java:192
>> 15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to
>> check splits against mongodb://
>> dsc-dbs-001.qasql.opentable.com:27017/A.MyColl
>> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null,
>> max= { "_id" : "54e64d626d0bfe0a24ba79b3"}
>> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" :
>> "54e64d646d0bfe0a24ba79e1"}
>> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "54e64d646d0bfe0a24ba79e1"}, max= { "_id" :
>> "5581d1c3d52db40bc8558c6b"}
>> ..
>> ..
>> 15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "55adf840d3b5be0724224807"}, max= { "_id" :
>> "55adf841b4d2970fb07d7288"}
>> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>> at org.bson.io.PoolOutputBuffer.(PoolOutputBuffer.java:224)
>> at org.bson.BasicBSONDecoder.(BasicBSONDecoder.java:499)
>> at
>> com.mongodb.hadoop.input.MongoInputSplit.(MongoInputSplit.java:59)
>> at
>> 

Re: CREATE TABLE ignores database when using PARQUET option

2015-09-13 Thread hbogert
I'm having the same problem, did you solve this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CREATE-TABLE-ignores-database-when-using-PARQUET-option-tp22824p24679.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Best way to merge final output part files created by Spark job

2015-09-13 Thread unk1102
Hi I have a spark job which creates around 500 part files inside each
directory I process. So I have thousands of such directories. So I need to
merge these small small 500 part files. I am using
spark.sql.shuffle.partition as 500 and my final small files are ORC files.
Is there a way to merge orc files in Spark if not please suggest the best
way to merge files created by Spark job in hdfs please guide. Thanks much. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-final-output-part-files-created-by-Spark-job-tp24681.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re:Re: RE: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-13 Thread Todd


Thanks Davies for the explanation.
When I turn off the following options, I still see that spark1.5 is much slower 
than 1.4.1. I am thinking how I can configure so that spark 1.5 can have 
similar performance as spark1.4 for this particular query..

--conf spark.sql.planner.sortMergeJoin=false 
--conf spark.sql.tungsten.enabled=false
--conf spark.shuffle.reduceLocality.enabled=false
--conf spark.sql.planner.externalSort=false
--conf spark.sql.parquet.filterPushdown=false
--conf spark.sql.codegen=false









At 2015-09-12 01:32:15, "Davies Liu"  wrote:
>I had ran similar benchmark for 1.5, do self join on a fact table with
>join key that had many duplicated rows (there are N rows for the same
>join key), say N, after join, there will be N*N rows for each join
>key. Generating the joined row is slower in 1.5 than 1.4 (it needs to
>copy left and right row together, but not in 1.4). If the generated
>row is accessed after join, there will be not much difference between
>1.5 and 1.4, because accessing the joined row is slower in 1.4 than
>1.5.
>
>So, for this particular query, 1.5 is slower than 1.4, will be even
>slower if you increase the N. But for real workload, it will not, 1.5
>is usually faster than 1.4.
>
>On Fri, Sep 11, 2015 at 1:31 AM, prosp4300  wrote:
>>
>>
>> By the way turn off the code generation could be an option to try, sometime 
>> code generation could introduce slowness
>>
>>
>> 在2015年09月11日 15:58,Cheng, Hao 写道:
>>
>> Can you confirm if the query really run in the cluster mode? Not the local 
>> mode. Can you print the call stack of the executor when the query is running?
>>
>>
>>
>> BTW: spark.shuffle.reduceLocality.enabled is the configuration of Spark, not 
>> Spark SQL.
>>
>>
>>
>> From: Todd [mailto:bit1...@163.com]
>> Sent: Friday, September 11, 2015 3:39 PM
>> To: Todd
>> Cc: Cheng, Hao; Jesse F Chen; Michael Armbrust; user@spark.apache.org
>> Subject: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ 
>> compared with spark 1.4.1 SQL
>>
>>
>>
>> I add the following two options:
>> spark.sql.planner.sortMergeJoin=false
>> spark.shuffle.reduceLocality.enabled=false
>>
>> But it still performs the same as not setting them two.
>>
>> One thing is that on the spark ui, when I click the SQL tab, it shows an 
>> empty page but the header title 'SQL',there is no table to show queries and 
>> execution plan information.
>>
>>
>>
>>
>>
>> At 2015-09-11 14:39:06, "Todd"  wrote:
>>
>>
>> Thanks Hao.
>>  Yes,it is still low as SMJ。Let me try the option your suggested,
>>
>>
>>
>>
>> At 2015-09-11 14:34:46, "Cheng, Hao"  wrote:
>>
>> You mean the performance is still slow as the SMJ in Spark 1.5?
>>
>>
>>
>> Can you set the spark.shuffle.reduceLocality.enabled=false when you start 
>> the spark-shell/spark-sql? It’s a new feature in Spark 1.5, and it’s true by 
>> default, but we found it probably causes the performance reduce dramatically.
>>
>>
>>
>>
>>
>> From: Todd [mailto:bit1...@163.com]
>> Sent: Friday, September 11, 2015 2:17 PM
>> To: Cheng, Hao
>> Cc: Jesse F Chen; Michael Armbrust; user@spark.apache.org
>> Subject: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with 
>> spark 1.4.1 SQL
>>
>>
>>
>> Thanks Hao for the reply.
>> I turn the merge sort join off, the physical plan is below, but the 
>> performance is roughly the same as it on...
>>
>> == Physical Plan ==
>> TungstenProject 
>> [ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
>>  ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight
>>   TungstenExchange hashpartitioning(ss_item_sk#2)
>>ConvertToUnsafe
>> Scan 
>> ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_list_price#12,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0]
>>   TungstenExchange hashpartitioning(ss_item_sk#25)
>>ConvertToUnsafe
>> Scan 
>> ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#25]
>>
>> Code Generation: true
>>
>>
>>
>>
>> At 2015-09-11 13:48:23, "Cheng, Hao"  wrote:
>>
>> This is not a big surprise the SMJ is slower than the HashJoin, as we do not 
>> fully utilize the sorting yet, more details can be found at 
>> https://issues.apache.org/jira/browse/SPARK-2926 .
>>
>>
>>
>> Anyway, can you disable the sort merge join by 
>> “spark.sql.planner.sortMergeJoin=false;” in Spark 1.5, and run the query 
>> again? In our previous testing, it’s about 20% slower for sort merge join. I 
>> am not sure if there anything else slow down the performance.
>>
>>
>>
>> Hao
>>
>>
>>
>>
>>
>> From: Jesse F Chen [mailto:jfc...@us.ibm.com]
>> Sent: Friday, September 11, 2015 1:18 PM
>> To: Michael Armbrust
>> Cc: Todd; user@spark.apache.org
>> Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with 
>> 

Re: Stopping SparkContext and HiveContext

2015-09-13 Thread Ted Yu
Please also see this thread: http://search-hadoop.com/m/q3RTtGpLeLyv97B1

On Sun, Sep 13, 2015 at 9:49 AM, Ted Yu  wrote:

> For #1, there is the following method:
>
>   @DeveloperApi
>   def getExecutorStorageStatus: Array[StorageStatus] = {
> assertNotStopped()
>
> You can wrap the call in try block catching IllegalStateException.
> Of course, this is just a workaround.
>
> FYI
>
> On Sun, Sep 13, 2015 at 1:48 AM, Ophir Cohen  wrote:
>
>> Hi,
>> I'm working on my companie's system that constructs out of Spark,
>> Zeppelin, Hive and some other technology and wonder regarding to ability to
>> stop contexts.
>>
>> Working on the test framwork for the system, when run tests someting I
>> would like to create new SparkContext in order to run the tests on 'clean'
>> context.
>> I found it hard to do as, first of all, I couldn't find any way to
>> understand of SparkContext is already stopped. It has private flag for that
>> but its private.
>> Anther problem is that when creating local HiveContext it initialize
>> derby instance. when trying to create new HiveContext it fails cause the DB
>> already exists.
>> Apperantly, there isn't anyway to tell HiveContext to stop and clear its
>> connection to the DB.
>>
>> Essintelly I'm looking for two things:
>> 1. Way to understand if SparkContext stopped already or not.
>> 2. Way to stop/close HiveContext that will close relevant
>> files/connection and release the resources.
>>
>> Thanks,
>> Ophir
>>
>
>


Re: Problem to persist Hibernate entity from Spark job

2015-09-13 Thread Zoran Jeremic
Hi guys,

I'm still trying to solve the issue with saving Hibernate entities from
Spark. After several attempts to redesign my own code I ended up with
HelloWorld example which clearly demonstrates that it's not the problem in
complexity of my code and session mixing in threads.

The code given bellow creates one simple hibernate entity and tries to save
it. If number of Spark partitions is more than one, line *println("Saved
tag:"+newTag)* is never executed. If I have only one partition, everything
works fine.

I would appreciate very much if somebody could explain what is the problem
in this code.


 val sc = SparkContextLoader.getSC
> val scalaUsersIds = Seq[Long](2, 8, 15, 14, 6, 17, 21, 34, 75, 128,
> 304)
> val usersRDD: RDD[Long] = sc.parallelize(scalaUsersIds)
> usersRDD.foreachPartition {
>   val session: Session =
> HibernateUtil.getSessionFactory().openSession()
>   users => {
> users.foreach {
>   userid =>
> {
>   val newTag: Tag = new Tag
>   newTag.setTitle("title" + userid)
>   try {
> val isActive: Boolean = session.getTransaction().isActive()
> if (!isActive) {
>   session.beginTransaction()
> }
> println("Saving tag:"+newTag)
> session.save(newTag)
> println("Saved tag:"+newTag)
> session.getTransaction().commit()
>   } catch {
> case ex: Exception => {
>   if (session.getTransaction() != null) {
> session.getTransaction().rollback()
> ex.printStackTrace()
>   }
> }
>   }
> }
> }
>   }
>   session.close()
> }
>

Thanks,
Zoran


On Sun, Sep 6, 2015 at 1:42 PM, Zoran Jeremic 
wrote:

> I have GenericDAO class which is initialized for each partition. This
> class uses SessionFactory.openSession() to open a new session in it's
> constructor. As per my understanding, this means that each partition have
> different session, but they are using the same SessionFactory to open it.
>
> why not create the session at the start of the saveInBatch method and
>> close it at the end
>>
> This won't work for me, or at least I think it won't. At the beginning of
> the process I load some entities (e.g. User, UserPreference...) from
> hibernate and then I use it across the process, even after I perform
> saveInBatch. It needs to be in session in order to pull data that I need
> and update it later, so I can't open another session inside the existing
> one.
>
> On Sun, Sep 6, 2015 at 1:40 AM, Matthew Johnson 
> wrote:
>
>> I agree with Igor - I would either make sure session is ThreadLocal or,
>> more simply, why not create the session at the start of the saveInBatch
>> method and close it at the end? Creating a SessionFactory is an expensive
>> operation but creating a Session is a relatively cheap one.
>> On 6 Sep 2015 07:27, "Igor Berman"  wrote:
>>
>>> how do you create your session? do you reuse it across threads? how do
>>> you create/close session manager?
>>> look for the problem in session creation, probably something deadlocked,
>>> as far as I remember hib.session should be created per thread
>>>
>>> On 6 September 2015 at 07:11, Zoran Jeremic 
>>> wrote:
>>>
 Hi,

 I'm developing long running process that should find RSS feeds that all
 users in the system have registered to follow, parse these RSS feeds,
 extract new entries and store it back to the database as Hibernate
 entities, so user can retrieve it. I want to use Apache Spark to enable
 parallel processing, since this process might take several hours depending
 on the number of users.

 The approach I thought should work was to use
 *useridsRDD.foreachPartition*, so I can have separate hibernate
 session for each partition. I created Database session manager that is
 initialized for each partition which keeps hibernate session alive until
 the process is over.

 Once all RSS feeds from one source are parsed and Feed entities are
 created, I'm sending the whole list to Database Manager method that saves
 the whole list in batch:

> public   void saveInBatch(List entities) {
> try{
>   boolean isActive = session.getTransaction().isActive();
> if ( !isActive) {
> session.beginTransaction();
> }
>for(Object entity:entities){
>  session.save(entity);
> }
>session.getTransaction().commit();
>  }catch(Exception ex){
> if(session.getTransaction()!=null) {
> session.getTransaction().rollback();
> ex.printStackTrace();
>}
>   }
>
> However, this 

Replacing Esper with Spark Streaming?

2015-09-13 Thread Otis Gospodnetić
Hi,

I'm wondering if anyone has attempted to replace Esper with Spark Streaming
or if anyone thinks Spark Streaming is/isn't a good tool for the (CEP) job?

We are considering Akka or Spark Streaming as possible Esper replacements
and would appreciate any input from people who tried to do that with either
of them.

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


Re: Training the MultilayerPerceptronClassifier

2015-09-13 Thread Feynman Liang
AFAIK no, we have a TODO item

to implement more rigorous correctness tests (e.g. referenced against
Weka). If you're interested, go ahead and comment the JIra
to let others know
you're working on it.

On Sat, Sep 12, 2015 at 4:58 AM, Rory Waite  wrote:

> Thanks Feynman, that is useful.
>
> I am interested in comparing the Spark MLP with Caffe. If I understand it
> correctly the changes to the Spark MLP API now restricts the functionality
> such that
>
> -Spark restricts the top layer to be a softmax
> -Can only use LBFGS to train the network
>
> I think this benchmark originally used a sigmoid top layer and SGD to
> optimise the network for spark. So the Caffe config used in the benchmark
> and the Spark setup are now not equivalent.
>
> Also this benchmark is designed for speed testing. I just want to do a
> quick sanity test and make sure that Caffe and Spark yield similar
> accuracies for MNIST before I try to test Spark on our own task. I am
> possibly reproducing existing efforts. Is there an example of this kind of
> sanity test I could reproduce?
>
>
>   
> www.sdl.com
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
> --
> *From:* Feynman Liang [fli...@databricks.com]
> *Sent:* 11 September 2015 20:34
> *To:* Rory Waite
> *Cc:* user@spark.apache.org
> *Subject:* Re: Training the MultilayerPerceptronClassifier
>
> Rory,
>
> I just sent a PR (https://github.com/avulanov/ann-benchmark/pull/1) to
> bring that benchmark up to date. Hope it helps.
>
> On Fri, Sep 11, 2015 at 6:39 AM, Rory Waite  wrote:
>
>> Hi,
>>
>> I’ve been trying to train the new MultilayerPerceptronClassifier in spark
>> 1.5 for the MNIST digit recognition task. I’m trying to reproduce the work
>> here:
>>
>> https://github.com/avulanov/ann-benchmark
>>
>> The API has changed since this work, so I’m not sure that I’m setting up
>> the task correctly.
>>
>> After I've trained the classifier, it classifies everything as a 1. It
>> even does this for the training set. I am doing something wrong with the
>> setup? I’m not looking for state of the art performance, just something
>> that looks reasonable. This experiment is meant to be a quick sanity test.
>>
>> Here is the job:
>>
>> import org.apache.log4j._
>> //Logger.getRootLogger.setLevel(Level.OFF)
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.regression.LabeledPoint
>> import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
>> import org.apache.spark.ml.Pipeline
>> import org.apache.spark.ml.PipelineStage
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
>> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkContext._
>> import org.apache.spark.SparkConf
>> import org.apache.spark.sql.SQLContext
>> import java.io.FileOutputStream
>> import java.io.ObjectOutputStream
>>
>> object MNIST {
>>  def main(args: Array[String]) {
>>val conf = new SparkConf().setAppName("MNIST")
>>conf.set("spark.driver.extraJavaOptions", "-XX:MaxPermSize=512M")
>>val sc = new SparkContext(conf)
>>val batchSize = 100
>>val numIterations = 5
>>val mlp = new MultilayerPerceptronClassifier
>>mlp.setLayers(Array[Int](780, 2500, 2000, 1500, 1000, 500, 10))
>>mlp.setMaxIter(numIterations)
>>mlp.setBlockSize(batchSize)
>>val train = MLUtils.loadLibSVMFile(sc,
>> "file:///misc/home/rwaite/mt-work/ann-benchmark/mnist.scale")
>>train.repartition(200)
>>val sqlContext = new SQLContext(sc)
>>import sqlContext.implicits._
>>val df = train.toDF
>>val model = mlp.fit(df)
>>val trainPredictions = model.transform(df)
>>trainPredictions.show(100)
>>val test = MLUtils.loadLibSVMFile(sc,
>> "file:///misc/home/rwaite/mt-work/ann-benchmark/mnist.scale.t", 780).toDF
>>val result = model.transform(test)
>>result.show(100)
>>val predictionAndLabels = result.select("prediction", "label")
>>val evaluator = new MulticlassClassificationEvaluator()
>>  .setMetricName("precision")
>>println("Precision:" + evaluator.evaluate(predictionAndLabels))
>>val fos = new
>> FileOutputStream("/home/rwaite/mt-work/ann-benchmark/spark_out/spark_model.obj");
>>val oos = new ObjectOutputStream(fos);
>>oos.writeObject(model);
>>oos.close
>>  }
>> }
>>