Spark Streaming: Doing operation in Receiver vs RDD

2015-10-07 Thread emiretsk
Hi,

I have a Spark Streaming program that is consuming message from Kafka and
has to decrypt and deserialize each message. I can implement it either as 
Kafka deserializer (that will run in a receiver or the new receiver-less
Kafka consumer)  or as RDD operations. What are the pros/cons of each?

As I see it, doing the operations on RDDs has the following implications
Better load balancing, and fault tolerance. (though I'm not quite sure what
happens when a receiver fails). Also, not sure if this is still true with
the new Kafka receiver-less consumer as it creates an RDD partition for each
Kafka partition
All functions that are applied to RDDs need to be either static or part of
serialzable objects. This makes using standard/3rd party Java libraries
harder. 
Cheers,
Eugene



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Doing-operation-in-Receiver-vs-RDD-tp24973.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Parquet file size

2015-10-07 Thread Younes Naguib
The TSV original files is 600GB and generated 40k files of 15-25MB.

y

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: October-07-15 3:18 PM
To: Younes Naguib; 'user@spark.apache.org'
Subject: Re: Parquet file size

Why do you want larger files? Doesn't the result Parquet file contain all the 
data in the original TSV file?

Cheng
On 10/7/15 11:07 AM, Younes Naguib wrote:
Hi,

I'm reading a large tsv file, and creating parquet files using sparksql:
insert overwrite
table tbl partition(year, month, day)
Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).
I wanted to generate larger files, any idea how to address this?

Thanks,
Younes Naguib
Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com 




Re: Parquet file size

2015-10-07 Thread Cheng Lian
The reason why so many small files are generated should probably be the 
fact that you are inserting into a partitioned table with three 
partition columns.


If you want a large Parquet files, you may try to either avoid using 
partitioned table, or using less partition columns (e.g., only year, 
without month and day).


Cheng

So you want to dump all data into a single large Parquet file?

On 10/7/15 1:55 PM, Younes Naguib wrote:


The TSV original files is 600GB and generated 40k files of 15-25MB.

y

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* October-07-15 3:18 PM
*To:* Younes Naguib; 'user@spark.apache.org'
*Subject:* Re: Parquet file size

Why do you want larger files? Doesn't the result Parquet file contain 
all the data in the original TSV file?


Cheng

On 10/7/15 11:07 AM, Younes Naguib wrote:

Hi,

I’m reading a large tsv file, and creating parquet files using
sparksql:

insert overwrite

table tbl partition(year, month, day)

Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).

I wanted to generate larger files, any idea how to address this?

*Thanks,*

*Younes Naguib*

Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC 
H3G 1R8


Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 |
younes.nag...@tritondigital.com





What is the difference between ml.classification.LogisticRegression and mllib.classification.LogisticRegressionWithLBFGS

2015-10-07 Thread YiZhi Liu
Hi everyone,

I'm curious about the difference between
ml.classification.LogisticRegression and
mllib.classification.LogisticRegressionWithLBFGS. Both of them are
optimized using LBFGS, the only difference I see is LogisticRegression
takes DataFrame while LogisticRegressionWithLBFGS takes RDD.

So I wonder,
1. Why not simply add a DataFrame training interface to
LogisticRegressionWithLBFGS?
2. Whats the difference between ml.classification and
mllib.classification package?
3. Why doesn't ml.classification.LogisticRegression call
mllib.optimization.LBFGS / mllib.optimization.OWLQN directly? Instead,
it uses breeze.optimize.LBFGS and re-implements most of the procedures
in mllib.optimization.{LBFGS,OWLQN}.

Thank you.

Best,

-- 
Yizhi Liu
Senior Software Engineer / Data Mining
www.mvad.com, Shanghai, China

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Model exports PMML (Random Forest)

2015-10-07 Thread Yasemin Kaya
Hi,

I want to export my model to PMML. But there is no development about random
forest. It is planned to 1.6 version. Is it possible producing my model
(random forest) PMML xml format manuelly? Thanks.

Best,
yasemin
-- 
hiç ender hiç


Re: does KafkaCluster can be public ?

2015-10-07 Thread Erwan ALLAIN
Thanks guys !

On Wed, Oct 7, 2015 at 1:41 AM, Cody Koeninger  wrote:

> Sure no prob.
>
> On Tue, Oct 6, 2015 at 6:35 PM, Tathagata Das  wrote:
>
>> Given the interest, I am also inclining towards making it a public
>> developer API. Maybe even experimental. Cody, mind submitting a patch?
>>
>>
>> On Tue, Oct 6, 2015 at 7:45 AM, Sean Owen  wrote:
>>
>>> For what it's worth, I also use this class in an app, but it happens
>>> to be from Java code where it acts as if it's public. So no problem
>>> for my use case, but I suppose, another small vote for the usefulness
>>> of this class to the caller. I end up using getLatestLeaderOffsets to
>>> figure out how to initialize initial offsets.
>>>
>>> On Tue, Oct 6, 2015 at 3:24 PM, Cody Koeninger 
>>> wrote:
>>> > I personally think KafkaCluster (or the equivalent) should be made
>>> public.
>>> > When I'm deploying spark I just sed out the private[spark] and rebuild.
>>> >
>>> > There's a general reluctance to make things public due to backwards
>>> > compatibility, but if enough people ask for it... ?
>>> >
>>> > On Tue, Oct 6, 2015 at 6:51 AM, Jonathan Coveney 
>>> wrote:
>>> >>
>>> >> You can put a class in the org.apache.spark namespace to access
>>> anything
>>> >> that is private[spark]. You can then make enrichments there to access
>>> >> whatever you need. Just beware upgrade pain :)
>>> >>
>>> >>
>>> >> El martes, 6 de octubre de 2015, Erwan ALLAIN <
>>> eallain.po...@gmail.com>
>>> >> escribió:
>>> >>>
>>> >>> Hello,
>>> >>>
>>> >>> I'm currently testing spark streaming with kafka.
>>> >>> I'm creating DirectStream with KafkaUtils and everything's fine.
>>> However
>>> >>> I would like to use the signature where I can specify my own message
>>> handler
>>> >>> (to play with partition and offset). In this case, I need to manage
>>> >>> offset/partition by myself to fill fromOffsets argument.
>>> >>> I have found a Jira on this usecase
>>> >>> https://issues.apache.org/jira/browse/SPARK-6714 but it has been
>>> closed
>>> >>> telling that it's too specific.
>>> >>> I'm aware that it can be done using kafka api (TopicMetaDataRequest
>>> and
>>> >>> OffsetRequest) but what I have to do is almost the same as the
>>> KafkaCluster
>>> >>> which is private.
>>> >>>
>>> >>> is it possible to :
>>> >>>  - add another signature in KafkaUtils ?
>>> >>>  - make KafkaCluster public ?
>>> >>>
>>> >>> or do you have any other srmart solution where I don't need to
>>> copy/paste
>>> >>> KafkaCluster ?
>>> >>>
>>> >>> Thanks.
>>> >>>
>>> >>> Regards,
>>> >>> Erwan ALLAIN
>>> >
>>> >
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Does feature parity exist between Spark and PySpark

2015-10-07 Thread Sean Owen
These are true, but it's not because Spark is written in Scala; it's
because it executes in the JVM. So, Scala/Java-based apps have an
advantage in that they don't have to serialize data back and forth to
a Python process, which also brings a new set of things that can go
wrong. Python is also inherently slower to execute. There is a real
runtime performance hit.

Python APIs lag a bit, especially in areas where you need to integrate
with third-party (JVM-based) components, like Kafka or something.

I would certainly choose Scala all else equal. But then again, I don't
like Python, so I'd say that. Pyspark is certainly usable but does
have its cost.

On Tue, Oct 6, 2015 at 11:15 PM, dant  wrote:
> Hi
>
> I'm hearing a common theme running that I should only do serious programming
> in Scala on Spark (1.5.1). Real power users use Scala. It is said that
> Python is great for analytics but in the end the code should be written to
> Scala to finalise. There are a number of reasons I'm hearing:
>
> 1. Spark is written in Scala so will always be faster than any other
> language implementation on top of it.
> 2. Spark releases always favour more features being visible and enabled for
> Scala API than Python API.
>
> Are there any truth's to the above? I'm a little sceptical.
>
> Apologies for the duplication, my previous message was held up due to
> subscription issue. Reposting now.
>
> Thanks
> Dan
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-07 Thread Sun, Rui
Not sure "/C/DevTools/spark-1.5.1/bin/spark-submit.cmd" is a valid?

From: Hossein [mailto:fal...@gmail.com]
Sent: Wednesday, October 7, 2015 12:46 AM
To: Khandeshi, Ami
Cc: Sun, Rui; akhandeshi; user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

Have you built the Spark jars? Can you run the Spark Scala shell?

--Hossein

On Tuesday, October 6, 2015, Khandeshi, Ami 
> wrote:
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
>
> sc <- sparkR.init(master="local")
Launching java with spark-submit command 
/C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds
In addition: Warning message:
running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"   --verbose 
sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had 
status 127

-Original Message-
From: Sun, Rui [mailto:rui@intel.com]
Sent: Tuesday, October 06, 2015 9:39 AM
To: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()?

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com]
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


--
--Hossein


Re: Graceful shutdown drops processing in Spark Streaming

2015-10-07 Thread Michal Čizmazia
Thanks! Done.

https://issues.apache.org/jira/browse/SPARK-10995

On 7 October 2015 at 21:24, Tathagata Das  wrote:

> Aaah, interesting, you are doing 15 minute slide duration. Yeah,
> internally the streaming scheduler waits for the last "batch" interval
> which has data to be processed, but if there is a sliding interval (i.e. 15
> mins) that is higher than batch interval, then that might not be run. This
> is indeed a bug and should be fixed. Mind setting up a JIRA and assigning
> it to me.
>
> On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia  wrote:
>
>> After triggering the graceful shutdown on the following application, the
>> application stops before the windowed stream reaches its slide duration. As
>> a result, the data is not completely processed (i.e. saveToMyStorage is not
>> called) before shutdown.
>>
>> According to the documentation, graceful shutdown should ensure that the
>> data, which has been received, is completely processed before shutdown.
>>
>> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>>
>> Spark version: 1.4.1
>>
>> Code snippet:
>>
>> Function0 factory = () -> {
>> JavaStreamingContext context = new JavaStreamingContext(sparkConf,
>> Durations.minutes(1));
>> context.checkpoint("/test");
>> JavaDStream records =
>> context.receiverStream(myReliableReceiver).flatMap(...);
>> records.persist(StorageLevel.MEMORY_AND_DISK());
>> records.foreachRDD(rdd -> { rdd.count(); return null; });
>> records
>> .window(Durations.minutes(15), Durations.minutes(15))
>> .foreachRDD(rdd -> saveToMyStorage(rdd));
>> return context;
>> };
>>
>> try (JavaStreamingContext context =
>> JavaStreamingContext.getOrCreate("/test", factory)) {
>> context.start();
>> waitForShutdownSignal();
>> Boolean stopSparkContext = true;
>> Boolean stopGracefully = true;
>> context.stop(stopSparkContext, stopGracefully);
>> }
>>
>>
>


Re: SparkSQL: First query execution is always slower than subsequent queries

2015-10-07 Thread Michael Armbrust
-dev +user

1). Is that the reason why it's always slow in the first run? Or are there
> any other reasons? Apparently it loads data to memory every time so it
> shouldn't be something to do with disk read should it?
>

You are probably seeing the effect of the JVMs JIT.  The first run is
executing in interpreted mode.  Once the JVM sees its a hot piece of code
it will compile it to native code.  This applies both to Spark / Spark SQL
itself and (as of Spark 1.5) the code that we dynamically generate for
doing expression evaluation.  Multiple runs with the same expressions will
used cached code that might have been JITed.


> 2). Does Spark use the Hadoop's Map Reduce engine under the hood? If so
> can we configure it to use MR2 instead of MR1.
>

No, we do not use the map reduce engine for execution.  You can however
compile Spark to work with either version of hadoop for so you can access
HDFS, etc.


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Please find attached.



On Wed, Oct 7, 2015 at 7:36 PM, Ted Yu  wrote:

> Hemant:
> Can you post the code snippet to the mailing list - other people would be
> interested.
>
> On Wed, Oct 7, 2015 at 5:50 AM, Hemant Bhanawat 
> wrote:
>
>> Will send you the code on your email id.
>>
>> On Wed, Oct 7, 2015 at 4:37 PM, Ophir Cohen  wrote:
>>
>>> Thanks!
>>> Can you check if you can provide example of the conversion?
>>>
>>>
>>> On Wed, Oct 7, 2015 at 2:05 PM, Hemant Bhanawat 
>>> wrote:
>>>
 Oh, this is an internal class of our project and I had used it without
 realizing the source.

 Anyway, the idea is to  wrap the InternalRow in a class that derives
 from Row. When you implement the functions of the trait 'Row ', the type
 conversions from Row types to InternalRow types has to be done for each of
 the types. But, as I can see, the primitive types (apart from String) don't
 need conversions. Map and Array would need some handling.

 I will check with the author of this code, I think this code can be
 contributed to Spark.

 Hemant
 www.snappydata.io
 linkedin.com/company/snappydata

 On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen  wrote:

> From which jar WrappedInternalRow comes from?
> It seems that I can't find it.
>
> BTW
> What I'm trying to do now is to create scala array from the fields and
> than create Row out of that array.
> The problem is that I get types mismatches...
>
> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat 
> wrote:
>
>> An approach can be to wrap your MutableRow in WrappedInternalRow
>> which is a child class of Row.
>>
>> Hemant
>> www.snappydata.io
>> linkedin.com/company/snappydata
>>
>>
>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:
>>
>>> Hi Guys,
>>> I'm upgrading to Spark 1.5.
>>>
>>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>>> created GenericMutableRow
>>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and 
>>> return it
>>> as org.apache.spark.sql.Row
>>>
>>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>>
>>> What do you suggest to do?
>>> How can I convert GenericMutableRow to Row?
>>>
>>> Prompt answer will be highly appreciated!
>>> Thanks,
>>> Ophir
>>>
>>
>>
>

>>>
>>
>
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.collection

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
 * Wraps an `InternalRow` to expose a `Row`
 */
final class WrappedInternalRow(override val schema: StructType,
val converters: Array[(InternalRow, Int) => Any]) extends Row {

  private var _internalRow: InternalRow = _
  private val cache = new Array[Any](schema.length)

  private[sql] def internalRow = _internalRow

  private[sql] def internalRow_=(row: InternalRow): Unit = {
_internalRow = row
val len = cache.length
var i = 0
while (i < len) {
  if (cache(i) != null) {
cache(i) = null
  }
  i += 1
}
  }

  override def length: Int = schema.length

  override def isNullAt(ordinal: Int): Boolean = _internalRow.isNullAt(ordinal)

  override def getBoolean(ordinal: Int) = _internalRow.getBoolean(ordinal)

  override def getByte(ordinal: Int) = _internalRow.getByte(ordinal)

  override def getShort(ordinal: Int) = _internalRow.getShort(ordinal)

  override def getInt(ordinal: Int) = _internalRow.getInt(ordinal)

  override def getLong(ordinal: Int) = _internalRow.getLong(ordinal)

  override def getFloat(ordinal: Int) = _internalRow.getFloat(ordinal)

  override def getDouble(ordinal: Int) = _internalRow.getDouble(ordinal)

  override def getString(ordinal: Int) = {
val v = cache(ordinal)
if (v == null) {
  val s = _internalRow.getUTF8String(ordinal).toString
  

Default size of a datatype in SparkSQL

2015-10-07 Thread vivek bhaskar
I want to understand whats use of default size for a given datatype?

Following link mention that its for internal size estimation.
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/DataType.html

Above behavior is also reflected in code where default value seems to be
used for stats purpose only.

But then we have default size of String datatype as 4096; why we went for
this random number? Or will it also restrict size of data? Any further
elaboration on how string datatype works will also help.

Regards,
Vivek


Re: Parquet file size

2015-10-07 Thread Deng Ching-Mallete
Hi,

In our case, we're using
the org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE to
increase the size of the RDD partitions when loading text files, so it
would generate larger parquet files. We just set it in the Hadoop conf of
the SparkContext. You need to be careful though about setting it to a large
value, as you might encounter issues related to this:

https://issues.apache.org/jira/browse/SPARK-6235

For our jobs, we're setting the split size to 512MB which generates between
110-200MB parquet files using the default compression. We're using
Spark-1.3.1, btw, and we also have the same partitioning of year/month/day
for our parquet files.

HTH,
Deng

On Thu, Oct 8, 2015 at 8:25 AM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Well, I only have data for 2015-08. So, in the end, only 31 partitions
> What I'm looking for, is some reasonably sized partitions.
> In any case, just the idea of controlling the output parquet files size or
> number would be nice.
>
> *Younes Naguib* *Streaming Division*
>
> Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
>
> Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | younes.naguib
> @tritondigital.com 
> --
> *From:* Cheng Lian [lian.cs@gmail.com]
> *Sent:* Wednesday, October 07, 2015 7:01 PM
>
> *To:* Younes Naguib; 'user@spark.apache.org'
> *Subject:* Re: Parquet file size
>
> The reason why so many small files are generated should probably be the
> fact that you are inserting into a partitioned table with three partition
> columns.
>
> If you want a large Parquet files, you may try to either avoid using
> partitioned table, or using less partition columns (e.g., only year,
> without month and day).
>
> Cheng
>
> So you want to dump all data into a single large Parquet file?
>
> On 10/7/15 1:55 PM, Younes Naguib wrote:
>
> The TSV original files is 600GB and generated 40k files of 15-25MB.
>
>
>
> y
>
>
>
> *From:* Cheng Lian [mailto:lian.cs@gmail.com ]
> *Sent:* October-07-15 3:18 PM
> *To:* Younes Naguib; 'user@spark.apache.org'
> *Subject:* Re: Parquet file size
>
>
>
> Why do you want larger files? Doesn't the result Parquet file contain all
> the data in the original TSV file?
>
> Cheng
>
> On 10/7/15 11:07 AM, Younes Naguib wrote:
>
> Hi,
>
>
>
> I’m reading a large tsv file, and creating parquet files using sparksql:
>
> insert overwrite
>
> table tbl partition(year, month, day)
>
> Select  from tbl_tsv;
>
>
>
> This works nicely, but generates small parquet files (15MB).
>
> I wanted to generate larger files, any idea how to address this?
>
>
>
> *Thanks,*
>
> *Younes Naguib*
>
> Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
>
> Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | younes.naguib
> @tritondigital.com 
>
>
>


Re: Graceful shutdown drops processing in Spark Streaming

2015-10-07 Thread Tathagata Das
Aaah, interesting, you are doing 15 minute slide duration. Yeah, internally
the streaming scheduler waits for the last "batch" interval which has data
to be processed, but if there is a sliding interval (i.e. 15 mins) that is
higher than batch interval, then that might not be run. This is indeed a
bug and should be fixed. Mind setting up a JIRA and assigning it to me.

On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia  wrote:

> After triggering the graceful shutdown on the following application, the
> application stops before the windowed stream reaches its slide duration. As
> a result, the data is not completely processed (i.e. saveToMyStorage is not
> called) before shutdown.
>
> According to the documentation, graceful shutdown should ensure that the
> data, which has been received, is completely processed before shutdown.
>
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>
> Spark version: 1.4.1
>
> Code snippet:
>
> Function0 factory = () -> {
> JavaStreamingContext context = new JavaStreamingContext(sparkConf,
> Durations.minutes(1));
> context.checkpoint("/test");
> JavaDStream records =
> context.receiverStream(myReliableReceiver).flatMap(...);
> records.persist(StorageLevel.MEMORY_AND_DISK());
> records.foreachRDD(rdd -> { rdd.count(); return null; });
> records
> .window(Durations.minutes(15), Durations.minutes(15))
> .foreachRDD(rdd -> saveToMyStorage(rdd));
> return context;
> };
>
> try (JavaStreamingContext context =
> JavaStreamingContext.getOrCreate("/test", factory)) {
> context.start();
> waitForShutdownSignal();
> Boolean stopSparkContext = true;
> Boolean stopGracefully = true;
> context.stop(stopSparkContext, stopGracefully);
> }
>
>


Re: Asking about the trend of increasing latency, hbase spikes.

2015-10-07 Thread Ted Yu
This question should be directed to user@

Can you use third party site for the images - they didn't go through.

On Wed, Oct 7, 2015 at 5:35 PM, UI-JIN LIM  wrote:

> Hi. This is Ui Jin, Lim in Korea, LG CNS
>
>
>
> We had setup and are operating hbase 0.98.13 on our customer, iptv
> Company.
>
> Our customer claim about this spike. Following is our status.
>
>
>
> . We had been operating with BlockCache(heap) for 3 months. The ratio of
> above 100ms has been reached 6.69% of total transaction.
>
>   The following graph show the trend of executions, above 100ms per
> minute, at June 17th. (This is the response time of 2~3 Hbase
> transactions)
>
> [image: cid:image001.jpg@01D1011A.5E93FF70]
>
>
>
> - After Hbase restarting at June 18th, The spikes had been reduced
> dramatically. But its ratio had been increasing continuously.
>
> [image: cid:image002.png@01D1011A.5E93FF70]
>
>
>
> - We had adopted off-heap bucket cache at August 19th. But the increasing
> does not disappear.
>
> [image: cid:image002.png@01D10103.82E488C0]
>
> [image: cid:image002.png@01D10103.82E488C0]
>
>
>
> - We set HBase to log slow query which have above 10ms execution time. All
> RegionServer logs show increasing its counts.
>
>
>
> [image: cid:image005.png@01D1011A.5E93FF70]
>
>
>
>
>
> What we concern is not the spike itself but its incremental trend.
>
> There is no increment of transactions and dataset.
>
> I ask your thoughts about this issue.
>
>
>
>
>
>
>
>
>
> -
>
> *UI JIN, LIM*
>
> *DNA 3.0** - **LG CNS*
>
> Data Architecture Advisory
>
> Big Data Technical Support Team
>
> Mobile : +82) 010-8307-1210
>
>
>


Running Spark in Yarn-client mode

2015-10-07 Thread Sushrut Ikhar
Hi,
I am new to Spark and I have been trying to run Spark in yarn-client mode.

I get this error in yarn logs :
Error: Could not find or load main class
org.apache.spark.executor.CoarseGrainedExecutorBackend

Also, I keep getting these warnings:

WARN YarnScheduler: Initial job has not accepted any resources; check your
cluster UI to ensure that workers are registered and have sufficient
resources

WARN YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has
disassociated

WARN ReliableDeliverySupervisor: Association with remote system
[akka.tcp://sparkYarnAM@] has failed, address is now gated
for [5000] ms. Reason is: [Disassociated].

I believe that executors are starting but are unable to connect back to the
driver.
How do I resolve this?
Also, I need help in locating the driver and executor node logs.

Thanks.

Regards,

Sushrut Ikhar
[image: https://]about.me/sushrutikhar



Is coalesce smart while merging partitions?

2015-10-07 Thread Cesar Flores
It is my understanding that the default behavior of coalesce function when
the user reduce the number of partitions is to only merge them without
executing shuffle.

My question is: Is this merging smart? For example does spark try to merge
the small partitions first or the election of partitions to merge is random?


Thanks
-- 
Cesar Flores


RE: Parquet file size

2015-10-07 Thread Younes Naguib
Well, I only have data for 2015-08. So, in the end, only 31 partitions
What I'm looking for, is some reasonably sized partitions.
In any case, just the idea of controlling the output parquet files size or 
number would be nice.

Younes Naguib Streaming Division
Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com 

From: Cheng Lian [lian.cs@gmail.com]
Sent: Wednesday, October 07, 2015 7:01 PM
To: Younes Naguib; 'user@spark.apache.org'
Subject: Re: Parquet file size

The reason why so many small files are generated should probably be the fact 
that you are inserting into a partitioned table with three partition columns.

If you want a large Parquet files, you may try to either avoid using 
partitioned table, or using less partition columns (e.g., only year, without 
month and day).

Cheng

So you want to dump all data into a single large Parquet file?

On 10/7/15 1:55 PM, Younes Naguib wrote:
The TSV original files is 600GB and generated 40k files of 15-25MB.

y

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: October-07-15 3:18 PM
To: Younes Naguib; 'user@spark.apache.org'
Subject: Re: Parquet file size

Why do you want larger files? Doesn't the result Parquet file contain all the 
data in the original TSV file?

Cheng
On 10/7/15 11:07 AM, Younes Naguib wrote:
Hi,

I’m reading a large tsv file, and creating parquet files using sparksql:
insert overwrite
table tbl partition(year, month, day)
Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).
I wanted to generate larger files, any idea how to address this?

Thanks,
Younes Naguib
Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com 





Re: Running Spark in Yarn-client mode

2015-10-07 Thread Jean-Baptiste Onofré

Hi Sushrut,

which packaging of Spark do you use ?
Do you have a working Yarn cluster (with at least one worker) ?

spark-hadoop-x ?

Regards
JB

On 10/08/2015 07:23 AM, Sushrut Ikhar wrote:

Hi,
I am new to Spark and I have been trying to run Spark in yarn-client mode.

I get this error in yarn logs :
Error: Could not find or load main class
org.apache.spark.executor.CoarseGrainedExecutorBackend

Also, I keep getting these warnings:

WARN YarnScheduler: Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have
sufficient resources

WARN YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has
disassociated

WARN ReliableDeliverySupervisor: Association with remote system
[akka.tcp://sparkYarnAM@] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].

I believe that executors are starting but are unable to connect back to
the driver.
How do I resolve this?
Also, I need help in locating the driver and executor node logs.

Thanks.

Regards,

Sushrut Ikhar
https://about.me/sushrutikhar





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet file size

2015-10-07 Thread Cheng Lian
Why do you want larger files? Doesn't the result Parquet file contain 
all the data in the original TSV file?


Cheng

On 10/7/15 11:07 AM, Younes Naguib wrote:


Hi,

I’m reading a large tsv file, and creating parquet files using sparksql:

insert overwrite

table tbl partition(year, month, day)

Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).

I wanted to generate larger files, any idea how to address this?

*Thanks,*

*Younes Naguib***

Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8

Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com






Re: RDD of ImmutableList

2015-10-07 Thread Jakub Dubovsky
I did not realized that scala's and java's immutable collections uses 
different api which causes this. Thank you for reminder. This makes some 
sense now...


-- Původní zpráva --
Od: Jonathan Coveney 
Komu: Jakub Dubovsky 
Datum: 7. 10. 2015 1:29:34
Předmět: Re: RDD of ImmutableList

"
Nobody is saying not to use immutable data structures, only that guava's 
aren't natively supported.



Scala's default collections library is all immutable. list, Vector, Map. 
This is what people generally use, especially in scala code!

El martes, 6 de octubre de 2015, Jakub Dubovsky  escribió:
"

Thank you for quick reaction.




I have to say this is very surprising to me. I never received an advice to 
stop using an immutable approach. Whole RDD is designed to be immutable 
(which is sort of sabotaged by not being able to (de)serialize immutable 
classes properly). I will ask on dev list if this is to be changed or not.




Ok, I have let go initial feelings and now let's be pragmatic. And this is 
still for everyone not just Igor:




I use a class from a library which is immutable. Now I want to use this 
class to represent my data in RDD because this saves me a huge amount of 
work. The class uses ImmutableList as one of its fields. That's why it 
fails. But isn't there a way to workaround this? I ask this because I have 
exactly zero knowledge about kryo and the way how it works. So for example 
would some of these two work?




1) Change the external class so that it implements writeObject, readObject 
methods (it's java). Will these methods be used by kryo? (I can ask the 
maintainers of a library to change the class if the change is reasonable. 
Adding these methods would be while dropping immutability certainly wouldn'
t)




2) Wrap the class to scala class which would translate the data during (de)
serialization?




  Thanks!

  Jakub Dubovsky


-- Původní zpráva --
Od: Igor Berman 
Komu: Jakub Dubovsky 
Datum: 5. 10. 2015 20:11:35
Předmět: Re: RDD of ImmutableList

"

kryo doesn't support guava's collections by default
I remember encountered project in github that fixes this(not sure though). 
I've ended to stop using guava collections as soon as spark rdds are 
concerned.




On 5 October 2015 at 21:04, Jakub Dubovsky  
wrote:
"
Hi all,



  I would like to have an advice on how to use ImmutableList with RDD. Small
 presentation of an essence of my problem in spark-shell with guava jar 
added:




scala> import com.google.common.collect.ImmutableList

import com.google.common.collect.ImmutableList




scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4), 
ImmutableList.of(3,6))

arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2], [2,
4], [3, 6])




scala> val rdd = sc.parallelize(arr)


rdd: org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] 
= ParallelCollectionRDD[0] at parallelize at :24




scala> rdd.count





 This results in kryo exception saying that it cannot add a new element to 
list instance while deserialization:




java.io.IOException: java.lang.UnsupportedOperationException



        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

        at org.apache.spark.rdd.ParallelCollectionPartition.readObject
(ParallelCollectionRDD.scala:70)

        ...

        at java.lang.Thread.run(Thread.java:745)


Caused by: java.lang.UnsupportedOperationException

        at com.google.common.collect.ImmutableCollection.add
(ImmutableCollection.java:91)

        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read
(CollectionSerializer.java:109)

        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read
(CollectionSerializer.java:18)

        ...





  It somehow makes sense. But I cannot think of a workaround and I do not 
believe that using ImmutableList with RDD is not possible. How this is 
solved?




  Thank you in advance!




   Jakub Dubovsky





"



"
"

"


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Oh, this is an internal class of our project and I had used it without
realizing the source.

Anyway, the idea is to  wrap the InternalRow in a class that derives from
Row. When you implement the functions of the trait 'Row ', the type
conversions from Row types to InternalRow types has to be done for each of
the types. But, as I can see, the primitive types (apart from String) don't
need conversions. Map and Array would need some handling.

I will check with the author of this code, I think this code can be
contributed to Spark.

Hemant
www.snappydata.io
linkedin.com/company/snappydata

On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen  wrote:

> From which jar WrappedInternalRow comes from?
> It seems that I can't find it.
>
> BTW
> What I'm trying to do now is to create scala array from the fields and
> than create Row out of that array.
> The problem is that I get types mismatches...
>
> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat 
> wrote:
>
>> An approach can be to wrap your MutableRow in WrappedInternalRow which is
>> a child class of Row.
>>
>> Hemant
>> www.snappydata.io
>> linkedin.com/company/snappydata
>>
>>
>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:
>>
>>> Hi Guys,
>>> I'm upgrading to Spark 1.5.
>>>
>>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>>> created GenericMutableRow
>>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
>>> as org.apache.spark.sql.Row
>>>
>>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>>
>>> What do you suggest to do?
>>> How can I convert GenericMutableRow to Row?
>>>
>>> Prompt answer will be highly appreciated!
>>> Thanks,
>>> Ophir
>>>
>>
>>
>


ClassCastException while reading data from HDFS through Spark

2015-10-07 Thread Vinoth Sankar
 I'm just reading data from HDFS through Spark. It throws
*java.lang.ClassCastException:
org.apache.hadoop.io.LongWritable cannot be cast to
org.apache.hadoop.io.BytesWritable* at line no 6. I never used LongWritable
in my code, no idea how the data was in that format.

Note : I'm not using MapReduce Concepts and also I'm not creating Jobs
explicitly. So i can't use job.setMapOutputKeyClass and
job.setMapOutputValueClass.

JavaPairRDD hdfsContent =
sparkContext.sequenceFile(hdfsPath, IntWritable.class, BytesWritable.class);
JavaRDD lines = hdfsContent.map(new Function, FileData>()
{
public FileData call(Tuple2 tuple2) throws
InvalidProtocolBufferException
{
byte[] bytes = tuple2._2().getBytes();
return FileData.parseFrom(bytes);
}
});


Re: spark multi tenancy

2015-10-07 Thread ayan guha
Can queues also be used to separate workloads?
On 7 Oct 2015 20:34, "Steve Loughran"  wrote:

>
> > On 7 Oct 2015, at 09:26, Dominik Fries 
> wrote:
> >
> > Hello Folks,
> >
> > We want to deploy several spark projects and want to use a unique project
> > user for each of them. Only the project user should start the spark
> > application and have the corresponding packages installed.
> >
> > Furthermore a personal user, which belongs to a specific project, should
> > start a spark application via the corresponding spark project user as
> proxy.
> > (Development)
> >
> > The Application is currently running with ipython / pyspark. (HDP 2.3 -
> > Spark 1.3.1)
> >
> > Is this possible or what is the best practice for a spark multi tenancy
> > environment ?
> >
> >
>
> Deploy on a kerberized YARN cluster and each application instance will be
> running as a different unix user in the cluster, with the appropriate
> access to HDFS —isolated.
>
> The issue then becomes "do workloads clash with each other?". If you want
> to isolate dev & production, using node labels to keep dev work off the
> production nodes is the standard technique.


Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-07 Thread Gerard Maas
Thanks for the feedback.

Cassandra does not seem to be the issue. The time for writing to Cassandra
is in the same order of magnitude (see below)

The code structure is roughly as follows:

dstream.filter(pred).foreachRDD{rdd =>
  val sparkT0 = currentTimeMs
  val metrics = rdd.mapPartitions{partition =>
 val partitionT0 = currentTimeMs
  partition.foreach{ transform andThen storeInCassandra _}
 val partitionT1 = currentTimeMs
 Seq(Metric( "local time", executor, partitionT1 - partitionT0,
records)).iterator
  }
  //materialize the rdd
  val allMetrics = metrics.collect()
  val sparkT1 = currentTimeMs
  val totalizedMetrics = // group by and reduce with sum
  val sparkT2 = currentTimeMs
  totalizedMetrics.foreach{ metric => gmetric.report(metric)}
}

Relating this code with the time table presented before (time in ms):

How measured?Slow TaskFast Taskexecutor local totalizedMetrics347.6281.53spark
computationsparkT1 - sparkT06930263metric collectionsparkT2 - sparkT170138wall
clock processsparkT2 - sparkT07000401total records processedtotalizedMetrics
42975002

What we observe is that the largest difference comes from the
materialization of the RDD. This pattern repeats cyclically one on, one off.

Any ideas where to further look?

kr, Gerard.


On Wed, Oct 7, 2015 at 1:33 AM, Tathagata Das  wrote:

> Good point!
>
> On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger  wrote:
>
>> I agree getting cassandra out of the picture is a good first step.
>>
>> But if you just do foreachRDD { _.count } recent versions of direct
>> stream shouldn't do any work at all on the executor (since the number of
>> messages in the rdd is known already)
>>
>> do a foreachPartition and println or count the iterator manually.
>>
>> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das 
>> wrote:
>>
>>> Are sure that this is not related to Cassandra inserts? Could you just
>>> do foreachRDD { _.count } instead  to keep Cassandra out of the picture and
>>> then test this agian.
>>>
>>> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase 
>>> wrote:
>>>
 Also check if the Kafka cluster is still balanced. Maybe one of the
 brokers manages too many partitions, all the work will stay on that
 executor unless you repartition right after kakfka (and I'm not saying you
 should).

 Sent from my iPhone

 On 06 Oct 2015, at 22:17, Cody Koeninger  wrote:

 I'm not clear on what you're measuring.  Can you post relevant code
 snippets including the measurement code?

 As far as kafka metrics, nothing currently.  There is an info-level log
 message every time a kafka rdd iterator is instantiated,

 log.info(s"Computing topic ${part.topic}, partition
 ${part.partition} " +

   s"offsets ${part.fromOffset} -> ${part.untilOffset}")


 If you log once you're done with an iterator you should be able to see
 the delta.

 The other thing to try is reduce the number of parts involved in the
 job to isolate it ... first thing I'd do there is take cassandra out of the
 equation.



 On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas 
 wrote:

> Hi Cody,
>
> The job is doing ETL from Kafka records to Cassandra. After a
> single filtering stage on Spark, the 'TL' part is done using the
> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>
> We have metrics on the executor work which we collect and add
> together, indicated here by 'local computation'.  As you can see, we also
> measure how much it cost us to measure :-)
> See how 'local work'  times are comparable.  What's not visible is the
> task scheduling and consuming the data from Kafka which becomes part of 
> the
> 'spark computation' part.
>
> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>
> Are there metrics available somehow on the Kafka reading time?
>
> Slow Task Fast Task local computation 347.6 281.53 spark computation
> 6930 263 metric collection 70 138 wall clock process 7000 401 total
> records processed 4297 5002
>
> (time in ms)
>
> kr, Gerard.
>
>
> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger 
> wrote:
>
>> Can you say anything more about what the job is doing?
>>
>> First thing I'd do is try to get some metrics on the time taken by
>> your code on the executors (e.g. when processing the iterator) to see if
>> it's consistent between the two situations.
>>
>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas 
>> wrote:
>>
>>> Hi,
>>>
>>> We recently migrated our streaming jobs to the direct kafka
>>> receiver. Our initial migration went quite fine but now we are seeing a
>>> weird zig-zag 

hiveContext sql number of tasks

2015-10-07 Thread patcharee

Hi,

I do a sql query on about 10,000 partitioned orc files. Because of the 
partition schema the files cannot be merged any longer (to reduce the 
total number).


From this command hiveContext.sql(sqlText), the 10K tasks were created 
to handle each file. Is it possible to use less tasks? How to force the 
spark sql to use less tasks?


BR,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Notification on Spark Streaming job failure

2015-10-07 Thread Steve Loughran

On 7 Oct 2015, at 06:28, Krzysztof Zarzycki 
> wrote:

Hi Vikram, So you give up using yarn-cluster mode of launching Spark jobs, is 
that right? AFAIK when using yarn-cluster mode, the launch process 
(spark-submit) monitors job running on YARN, but if it is killed/dies, it just 
stops printing the state (RUNNING usually), without influencing the monitored 
job. So you cannot use monit features on the launch process (like restart on 
fail,etc.)

One more thing: Monit depends on pidfiles and spark-submit (in yarn-client 
mode) does not create them. Do you create them on your own?

Thanks!
Krzysiek


you know, there's nothing to stop anyone adding a little monitoring tool -just 
poll the YARN RM for application reports and then fail if the application -> 
FAILED/KILLED states. If you do this, do test what happens during AM Restart 
-you probably want to send a notification, but it is not as serious as a full 
application failure



2015-10-07 6:37 GMT+02:00 Vikram Kone 
>:
We are using Monit to kick off spark streaming jobs n seems to work fine.


On Monday, September 28, 2015, Chen Song 
> wrote:
I am also interested specifically in monitoring and alerting on Spark streaming 
jobs. It will be helpful to get some general guidelines or advice on this, from 
people who implemented anything on this.

On Fri, Sep 18, 2015 at 2:35 AM, Krzysztof Zarzycki  
wrote:
Hi there Spark Community,
I would like to ask you for an advice: I'm running Spark Streaming jobs in 
production. Sometimes these jobs fail and I would like to get email 
notification about it. Do you know how I can set up Spark to notify me by email 
if my job fails? Or do I have to use external monitoring tool?
I'm thinking of the following options:
1. As I'm running those jobs on YARN, monitor somehow YARN jobs. Looked for it 
as well but couldn't find any YARN feature to do it.
2. Run Spark Streaming job in some scheduler, like Oozie, Azkaban, Luigi. Those 
are created rather for batch jobs, not streaming, but could work. Has anyone 
tried that?
3. Run job driver under "monit" tool and catch the failure and send an email 
about it. Currently I'm deploying with yarn-cluster mode and I would need to 
resign from it to run under monit
4. Implement monitoring tool (like Graphite, Ganglia, Prometheus) and use Spark 
metrics. And then implement alerting in those. Can I get information of failed 
jobs in Spark metrics?
5. As 4. but implement my own custom job metrics and monitor them.

What's your opinion about my options? How do you people solve this problem? 
Anything Spark specific?
I'll be grateful for any advice in this subject.
Thanks!
Krzysiek




--
Chen Song





Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Ophir Cohen
>From which jar WrappedInternalRow comes from?
It seems that I can't find it.

BTW
What I'm trying to do now is to create scala array from the fields and than
create Row out of that array.
The problem is that I get types mismatches...

On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat 
wrote:

> An approach can be to wrap your MutableRow in WrappedInternalRow which is
> a child class of Row.
>
> Hemant
> www.snappydata.io
> linkedin.com/company/snappydata
>
>
> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:
>
>> Hi Guys,
>> I'm upgrading to Spark 1.5.
>>
>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>> created GenericMutableRow
>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
>> as org.apache.spark.sql.Row
>>
>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>
>> What do you suggest to do?
>> How can I convert GenericMutableRow to Row?
>>
>> Prompt answer will be highly appreciated!
>> Thanks,
>> Ophir
>>
>
>


Re: ClassCastException while reading data from HDFS through Spark

2015-10-07 Thread UMESH CHAUDHARY
As per the Exception, it looks like there is a mismatch in actual sequence
file's value type and the one which is provided by you in your code.
Change BytesWritable
to *LongWritable * and feel the execution.

-Umesh

On Wed, Oct 7, 2015 at 2:41 PM, Vinoth Sankar  wrote:

> I'm just reading data from HDFS through Spark. It throws 
> *java.lang.ClassCastException:
> org.apache.hadoop.io.LongWritable cannot be cast to
> org.apache.hadoop.io.BytesWritable* at line no 6. I never used
> LongWritable in my code, no idea how the data was in that format.
>
> Note : I'm not using MapReduce Concepts and also I'm not creating Jobs
> explicitly. So i can't use job.setMapOutputKeyClass and
> job.setMapOutputValueClass.
>
> JavaPairRDD hdfsContent =
> sparkContext.sequenceFile(hdfsPath, IntWritable.class, BytesWritable.class);
> JavaRDD lines = hdfsContent.map(new Function BytesWritable>, FileData>()
> {
> public FileData call(Tuple2 tuple2) throws
> InvalidProtocolBufferException
> {
> byte[] bytes = tuple2._2().getBytes();
> return FileData.parseFrom(bytes);
> }
> });
>


Re: spark multi tenancy

2015-10-07 Thread Dominik Fries
Currently we try to execute pyspark from user CLI, but in context of project 
user, but get this error : (the cluster is kerberized)

[@edgenode1 ~]$ pyspark --master yarn --num-executors 5 --proxy-user 

Python 2.7.5 (default, Jun 24 2015, 00:41:19) 
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
15/10/06 09:44:24 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
15/10/06 09:44:25 INFO SparkContext: Running Spark version 1.3.1
15/10/06 09:44:25 INFO SecurityManager: Changing view acls to: 
,
15/10/06 09:44:25 INFO SecurityManager: Changing modify acls to: 
,
15/10/06 09:44:25 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(, 
); users with modify permissions: Set(, )
15/10/06 09:44:25 INFO Slf4jLogger: Slf4jLogger started
15/10/06 09:44:25 INFO Remoting: Starting remoting
15/10/06 09:44:26 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@:40607]
15/10/06 09:44:26 INFO Utils: Successfully started service 'sparkDriver' on 
port 40607.
15/10/06 09:44:26 INFO SparkEnv: Registering MapOutputTracker
15/10/06 09:44:26 INFO SparkEnv: Registering BlockManagerMaster
15/10/06 09:44:26 INFO DiskBlockManager: Created local directory at 
/tmp/spark-10b70025-ca98-4940-91b8-6dbd0b7148aa/blockmgr-33e9fb6d-d5b2-4fa5-876f-0b91501be632
15/10/06 09:44:26 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/10/06 09:44:26 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-1a4b86f0-3e57-4f44-bded-6157f4f1933f/httpd-2cafcce9-71ec-44fb-8500-2c70756ea3b9
15/10/06 09:44:26 INFO HttpServer: Starting HTTP Server
15/10/06 09:44:26 INFO Server: jetty-8.y.z-SNAPSHOT
15/10/06 09:44:26 INFO AbstractConnector: Started SocketConnector@0.0.0.0:34903
15/10/06 09:44:26 INFO Utils: Successfully started service 'HTTP file server' 
on port 34903.
15/10/06 09:44:26 INFO SparkEnv: Registering OutputCommitCoordinator
15/10/06 09:44:26 INFO Server: jetty-8.y.z-SNAPSHOT
15/10/06 09:44:26 INFO AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:4040
15/10/06 09:44:26 INFO Utils: Successfully started service 'SparkUI' on port 
4040.
15/10/06 09:44:26 INFO SparkUI: Started SparkUI at http://:4040
spark.yarn.driver.memoryOverhead is set but does not apply in client mode.
15/10/06 09:44:27 INFO TimelineClientImpl: Timeline service address: 
http://:8188/ws/v1/timeline/
15/10/06 09:44:27 INFO RMProxy: Connecting to ResourceManager at 
/10.49.20.5:8050
Traceback (most recent call last):
  File "/usr/hdp/2.3.0.0-2557/spark/python/pyspark/shell.py", line 50, in 

sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
  File "/usr/hdp/2.3.0.0-2557/spark/python/pyspark/context.py", line 110, in 
__init__
conf, jsc, profiler_cls)
  File "/usr/hdp/2.3.0.0-2557/spark/python/pyspark/context.py", line 158, in 
_do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
  File "/usr/hdp/2.3.0.0-2557/spark/python/pyspark/context.py", line 211, in 
_initialize_context
return self._jvm.JavaSparkContext(jconf)
  File 
"/home//.local/lib/python2.7/site-packages/py4j-0.9-py2.7.egg/py4j/java_gateway.py",
 line 1064, in __call__
answer, self._gateway_client, None, self._fqn)
  File 
"/home//.local/lib/python2.7/site-packages/py4j-0.9-py2.7.egg/py4j/protocol.py",
 line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling 
None.org.apache.spark.api.java.JavaSparkContext.

@guha, yes you can separate workloads via yarn capacity scheduler. 

Von: ayan guha [mailto:guha.a...@gmail.com] 
Gesendet: Mittwoch, 7. Oktober 2015 12:06
An: Steve Loughran 
Cc: user ; Dominik Fries 
Betreff: Re: spark multi tenancy

Can queues also be used to separate workloads?
On 7 Oct 2015 20:34, "Steve Loughran"  wrote:

> On 7 Oct 2015, at 09:26, Dominik Fries  wrote:
>
> Hello Folks,
>
> We want to deploy several spark projects and want to use a unique project
> user for each of them. Only the project user should start the spark
> application and have the corresponding packages installed.
>
> Furthermore a personal user, which belongs to a specific project, should
> start a spark application via the corresponding spark project user as proxy.
> (Development)
>
> The Application is currently running with ipython / pyspark. (HDP 2.3 -
> Spark 1.3.1)
>
> Is this possible or what is the best practice for a spark multi tenancy
> environment ?
>
>

Deploy on a kerberized YARN cluster and each application instance will be 
running as a different unix user in the cluster, with the appropriate access to 
HDFS —isolated.

The issue then becomes "do workloads clash with each other?". If you want to 

Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Ophir Cohen
Thanks!
Can you check if you can provide example of the conversion?


On Wed, Oct 7, 2015 at 2:05 PM, Hemant Bhanawat 
wrote:

> Oh, this is an internal class of our project and I had used it without
> realizing the source.
>
> Anyway, the idea is to  wrap the InternalRow in a class that derives from
> Row. When you implement the functions of the trait 'Row ', the type
> conversions from Row types to InternalRow types has to be done for each of
> the types. But, as I can see, the primitive types (apart from String) don't
> need conversions. Map and Array would need some handling.
>
> I will check with the author of this code, I think this code can be
> contributed to Spark.
>
> Hemant
> www.snappydata.io
> linkedin.com/company/snappydata
>
> On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen  wrote:
>
>> From which jar WrappedInternalRow comes from?
>> It seems that I can't find it.
>>
>> BTW
>> What I'm trying to do now is to create scala array from the fields and
>> than create Row out of that array.
>> The problem is that I get types mismatches...
>>
>> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat 
>> wrote:
>>
>>> An approach can be to wrap your MutableRow in WrappedInternalRow which
>>> is a child class of Row.
>>>
>>> Hemant
>>> www.snappydata.io
>>> linkedin.com/company/snappydata
>>>
>>>
>>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:
>>>
 Hi Guys,
 I'm upgrading to Spark 1.5.

 In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
 created GenericMutableRow
 (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
 as org.apache.spark.sql.Row

 Starting from Spark 1.5 GenericMutableRow isn't extends Row.

 What do you suggest to do?
 How can I convert GenericMutableRow to Row?

 Prompt answer will be highly appreciated!
 Thanks,
 Ophir

>>>
>>>
>>
>


Re: spark multi tenancy

2015-10-07 Thread Steve Loughran

> On 7 Oct 2015, at 09:26, Dominik Fries  wrote:
> 
> Hello Folks,
> 
> We want to deploy several spark projects and want to use a unique project
> user for each of them. Only the project user should start the spark
> application and have the corresponding packages installed. 
> 
> Furthermore a personal user, which belongs to a specific project, should
> start a spark application via the corresponding spark project user as proxy.
> (Development)
> 
> The Application is currently running with ipython / pyspark. (HDP 2.3 -
> Spark 1.3.1)
> 
> Is this possible or what is the best practice for a spark multi tenancy
> environment ?
> 
> 

Deploy on a kerberized YARN cluster and each application instance will be 
running as a different unix user in the cluster, with the appropriate access to 
HDFS —isolated.

The issue then becomes "do workloads clash with each other?". If you want to 
isolate dev & production, using node labels to keep dev work off the production 
nodes is the standard technique. 

Re: RDD of ImmutableList

2015-10-07 Thread Sean Owen
I think Java's immutable collections are fine with respect to kryo --
that's not the same as Guava.

On Wed, Oct 7, 2015 at 11:56 AM, Jakub Dubovsky
 wrote:
> I did not realized that scala's and java's immutable collections uses
> different api which causes this. Thank you for reminder. This makes some
> sense now...
>
> -- Původní zpráva --
> Od: Jonathan Coveney 
> Komu: Jakub Dubovsky 
> Datum: 7. 10. 2015 1:29:34
>
>
> Předmět: Re: RDD of ImmutableList
>
>
> Nobody is saying not to use immutable data structures, only that guava's
> aren't natively supported.
>
> Scala's default collections library is all immutable. list, Vector, Map.
> This is what people generally use, especially in scala code!
>
> El martes, 6 de octubre de 2015, Jakub Dubovsky
>  escribió:
>
> Thank you for quick reaction.
>
> I have to say this is very surprising to me. I never received an advice to
> stop using an immutable approach. Whole RDD is designed to be immutable
> (which is sort of sabotaged by not being able to (de)serialize immutable
> classes properly). I will ask on dev list if this is to be changed or not.
>
> Ok, I have let go initial feelings and now let's be pragmatic. And this is
> still for everyone not just Igor:
>
> I use a class from a library which is immutable. Now I want to use this
> class to represent my data in RDD because this saves me a huge amount of
> work. The class uses ImmutableList as one of its fields. That's why it
> fails. But isn't there a way to workaround this? I ask this because I have
> exactly zero knowledge about kryo and the way how it works. So for example
> would some of these two work?
>
> 1) Change the external class so that it implements writeObject, readObject
> methods (it's java). Will these methods be used by kryo? (I can ask the
> maintainers of a library to change the class if the change is reasonable.
> Adding these methods would be while dropping immutability certainly
> wouldn't)
>
> 2) Wrap the class to scala class which would translate the data during
> (de)serialization?
>
>   Thanks!
>   Jakub Dubovsky
>
> -- Původní zpráva --
> Od: Igor Berman 
> Komu: Jakub Dubovsky 
> Datum: 5. 10. 2015 20:11:35
> Předmět: Re: RDD of ImmutableList
>
>
> kryo doesn't support guava's collections by default
> I remember encountered project in github that fixes this(not sure though).
> I've ended to stop using guava collections as soon as spark rdds are
> concerned.
>
> On 5 October 2015 at 21:04, Jakub Dubovsky 
> wrote:
>
> Hi all,
>
>   I would like to have an advice on how to use ImmutableList with RDD. Small
> presentation of an essence of my problem in spark-shell with guava jar
> added:
>
> scala> import com.google.common.collect.ImmutableList
> import com.google.common.collect.ImmutableList
>
> scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4),
> ImmutableList.of(3,6))
> arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2], [2,
> 4], [3, 6])
>
> scala> val rdd = sc.parallelize(arr)
> rdd: org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]]
> = ParallelCollectionRDD[0] at parallelize at :24
>
> scala> rdd.count
>
>  This results in kryo exception saying that it cannot add a new element to
> list instance while deserialization:
>
> java.io.IOException: java.lang.UnsupportedOperationException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
> at
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
> ...
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
> at
> com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:91)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
> ...
>
>   It somehow makes sense. But I cannot think of a workaround and I do not
> believe that using ImmutableList with RDD is not possible. How this is
> solved?
>
>   Thank you in advance!
>
>Jakub Dubovsky
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What happens in the master or slave launch ?

2015-10-07 Thread Camelia Elena Ciolac
Hello,

I have the following question:

I have two scenarios:
1) in one scenario (if I'm connected on the target node) the master starts 
successfully.
Its log contains:

Spark Command: /usr/opt/java/jdk1.7.0_07/jre/bin/java -cp 
/home/camelia/spark-1.4.1-bin-hadoop2.6/sbin/../conf/:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
 -Dspark.deploy.defaultCores=2 -Xms512m -Xmx512m -XX:MaxPermSize=256m 
org.apache.spark.deploy.master.Master --ip ttitania-6 --port 7077 --webui-port 
8080

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/10/07 13:23:55 INFO Master: Registered signal handlers for [TERM, HUP, INT]
15/10/07 13:23:55 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
15/10/07 13:23:56 INFO SecurityManager: Changing view acls to: camelia
15/10/07 13:23:56 INFO SecurityManager: Changing modify acls to: camelia
15/10/07 13:23:56 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(camelia); users 
with modify permissions: Set(camelia)
15/10/07 13:23:56 INFO Slf4jLogger: Slf4jLogger started
15/10/07 13:23:56 INFO Remoting: Starting remoting
15/10/07 13:23:57 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkMaster@ttitania-6:7077]
15/10/07 13:23:57 INFO Utils: Successfully started service 'sparkMaster' on 
port 7077.
15/10/07 13:23:57 INFO Utils: Successfully started service on port 6066.
15/10/07 13:23:57 INFO StandaloneRestServer: Started REST server for submitting 
applications on port 6066
15/10/07 13:23:57 INFO Master: Starting Spark master at spark://ttitania-6:7077
15/10/07 13:23:57 INFO Master: Running Spark version 1.4.1
15/10/07 13:23:57 INFO Utils: Successfully started service 'MasterUI' on port 
8080.
15/10/07 13:23:57 INFO MasterWebUI: Started MasterWebUI at 
http://129.16.20.156:8080
15/10/07 13:23:57 INFO Master: I have been elected leader! New state: ALIVE



2) in another scenario (trying to start the spark master remotely), when I try 
to start one master, it only partially starts and then shuts down. Its log is:

Spark Command: /usr/opt/java/jdk1.7.0_07/jre/bin/java -cp 
/home/camelia/spark-1.4.1-bin-hadoop2.6/sbin/../conf/:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
 -Dspark.deploy.defaultCores=2 -Xms512m -Xmx512m -XX:MaxPermSize=256m 
org.apache.spark.deploy.master.Master --ip ttitania-6 --port 7077 --webui-port 
8080

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/10/07 12:56:31 INFO Master: Registered signal handlers for [TERM, HUP, INT]
15/10/07 12:56:32 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
15/10/07 12:56:32 INFO SecurityManager: Changing view acls to: camelia
15/10/07 12:56:32 INFO SecurityManager: Changing modify acls to: camelia
15/10/07 12:56:32 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(camelia); users 
with modify permissions: Set(camelia)



Can you please explain which might be the problem?

I wait impatiently for your reply.
Many thanks,
Camelia


Re: question on make multiple external calls within each partition

2015-10-07 Thread Chen Song
Thanks TD and Ashish.

On Mon, Oct 5, 2015 at 9:14 PM, Tathagata Das  wrote:

> You could create a threadpool on demand within the foreachPartitoin
> function, then handoff the REST calls to that threadpool, get back the
> futures and wait for them to finish. Should be pretty straightforward. Make
> sure that your foreachPartition function cleans up the threadpool before
> finishing. Alternatively, you can create an on-demand singleton threadpool
> that is reused across batches, will reduce the cost of creating threadpools
> everytime.
>
> On Mon, Oct 5, 2015 at 6:07 PM, Ashish Soni  wrote:
>
>> Need more details but you might want to filter the data first ( create
>> multiple RDD) and then process.
>>
>>
>> > On Oct 5, 2015, at 8:35 PM, Chen Song  wrote:
>> >
>> > We have a use case with the following design in Spark Streaming.
>> >
>> > Within each batch,
>> > * data is read and partitioned by some key
>> > * forEachPartition is used to process the entire partition
>> > * within each partition, there are several REST clients created to
>> connect to different REST services
>> > * for the list of records within each partition, it will call these
>> services, each service call is independent of others; records are just
>> pre-partitioned to make these calls more efficiently.
>> >
>> > I have a question
>> > * Since each call is time taking and to prevent the calls to be
>> executed sequentially, how can I parallelize the service calls within
>> processing of each partition? Can I just use Scala future within
>> forEachPartition(or mapPartitions)?
>> >
>> > Any suggestions greatly appreciated.
>> >
>> > Chen
>> >
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Chen Song


Re: Notification on Spark Streaming job failure

2015-10-07 Thread Adrian Tanase
We’re deploying using YARN in cluster mode, to take advantage of automatic 
restart of long running streaming app. We’ve also done a POC on top of 
Mesos+Marathon, that’s always an option.

For monitoring / alerting, we’re using a combination of:

  *   Spark REST API queried from OpsView via nagios style checks
 *   Here, we have thresholds on things like number of successful jobs / 
tasks, total execution time, etc
  *   Custom business/operational metrics logged manually from the streaming 
app to OpenTSDB
 *   we’re using a combination of spark accumulators and custom RDDs – 
after summarizing some counters we’re pushing them to OpenTSDB via the REST API
 *   we’re using dashboards built with Grafana that poll OpenTSDB – nicer 
looking, same functionality
 *   We have a custom opsview check that queries OpenTSDB and looks for 
some successful number of events processed by the job over a period of time
 *   This is coupled with  a stable stream of data from a canary instance

Hope this helps – feel free to google around for all the above buzzwords :). I 
can get into more details on demand.

-adrian

From: Chen Song
Date: Monday, September 28, 2015 at 5:00 PM
To: Krzysztof Zarzycki
Cc: user
Subject: Re: Notification on Spark Streaming job failure

I am also interested specifically in monitoring and alerting on Spark streaming 
jobs. It will be helpful to get some general guidelines or advice on this, from 
people who implemented anything on this.

On Fri, Sep 18, 2015 at 2:35 AM, Krzysztof Zarzycki 
> wrote:
Hi there Spark Community,
I would like to ask you for an advice: I'm running Spark Streaming jobs in 
production. Sometimes these jobs fail and I would like to get email 
notification about it. Do you know how I can set up Spark to notify me by email 
if my job fails? Or do I have to use external monitoring tool?
I'm thinking of the following options:
1. As I'm running those jobs on YARN, monitor somehow YARN jobs. Looked for it 
as well but couldn't find any YARN feature to do it.
2. Run Spark Streaming job in some scheduler, like Oozie, Azkaban, Luigi. Those 
are created rather for batch jobs, not streaming, but could work. Has anyone 
tried that?
3. Run job driver under "monit" tool and catch the failure and send an email 
about it. Currently I'm deploying with yarn-cluster mode and I would need to 
resign from it to run under monit
4. Implement monitoring tool (like Graphite, Ganglia, Prometheus) and use Spark 
metrics. And then implement alerting in those. Can I get information of failed 
jobs in Spark metrics?
5. As 4. but implement my own custom job metrics and monitor them.

What's your opinion about my options? How do you people solve this problem? 
Anything Spark specific?
I'll be grateful for any advice in this subject.
Thanks!
Krzysiek




--
Chen Song



Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Ted Yu
Hemant:
Can you post the code snippet to the mailing list - other people would be
interested.

On Wed, Oct 7, 2015 at 5:50 AM, Hemant Bhanawat 
wrote:

> Will send you the code on your email id.
>
> On Wed, Oct 7, 2015 at 4:37 PM, Ophir Cohen  wrote:
>
>> Thanks!
>> Can you check if you can provide example of the conversion?
>>
>>
>> On Wed, Oct 7, 2015 at 2:05 PM, Hemant Bhanawat 
>> wrote:
>>
>>> Oh, this is an internal class of our project and I had used it without
>>> realizing the source.
>>>
>>> Anyway, the idea is to  wrap the InternalRow in a class that derives
>>> from Row. When you implement the functions of the trait 'Row ', the type
>>> conversions from Row types to InternalRow types has to be done for each of
>>> the types. But, as I can see, the primitive types (apart from String) don't
>>> need conversions. Map and Array would need some handling.
>>>
>>> I will check with the author of this code, I think this code can be
>>> contributed to Spark.
>>>
>>> Hemant
>>> www.snappydata.io
>>> linkedin.com/company/snappydata
>>>
>>> On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen  wrote:
>>>
 From which jar WrappedInternalRow comes from?
 It seems that I can't find it.

 BTW
 What I'm trying to do now is to create scala array from the fields and
 than create Row out of that array.
 The problem is that I get types mismatches...

 On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat 
 wrote:

> An approach can be to wrap your MutableRow in WrappedInternalRow which
> is a child class of Row.
>
> Hemant
> www.snappydata.io
> linkedin.com/company/snappydata
>
>
> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:
>
>> Hi Guys,
>> I'm upgrading to Spark 1.5.
>>
>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>> created GenericMutableRow
>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return 
>> it
>> as org.apache.spark.sql.Row
>>
>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>
>> What do you suggest to do?
>> How can I convert GenericMutableRow to Row?
>>
>> Prompt answer will be highly appreciated!
>> Thanks,
>> Ophir
>>
>
>

>>>
>>
>


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Will send you the code on your email id.

On Wed, Oct 7, 2015 at 4:37 PM, Ophir Cohen  wrote:

> Thanks!
> Can you check if you can provide example of the conversion?
>
>
> On Wed, Oct 7, 2015 at 2:05 PM, Hemant Bhanawat 
> wrote:
>
>> Oh, this is an internal class of our project and I had used it without
>> realizing the source.
>>
>> Anyway, the idea is to  wrap the InternalRow in a class that derives from
>> Row. When you implement the functions of the trait 'Row ', the type
>> conversions from Row types to InternalRow types has to be done for each of
>> the types. But, as I can see, the primitive types (apart from String) don't
>> need conversions. Map and Array would need some handling.
>>
>> I will check with the author of this code, I think this code can be
>> contributed to Spark.
>>
>> Hemant
>> www.snappydata.io
>> linkedin.com/company/snappydata
>>
>> On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen  wrote:
>>
>>> From which jar WrappedInternalRow comes from?
>>> It seems that I can't find it.
>>>
>>> BTW
>>> What I'm trying to do now is to create scala array from the fields and
>>> than create Row out of that array.
>>> The problem is that I get types mismatches...
>>>
>>> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat 
>>> wrote:
>>>
 An approach can be to wrap your MutableRow in WrappedInternalRow which
 is a child class of Row.

 Hemant
 www.snappydata.io
 linkedin.com/company/snappydata


 On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:

> Hi Guys,
> I'm upgrading to Spark 1.5.
>
> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
> created GenericMutableRow
> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return 
> it
> as org.apache.spark.sql.Row
>
> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>
> What do you suggest to do?
> How can I convert GenericMutableRow to Row?
>
> Prompt answer will be highly appreciated!
> Thanks,
> Ophir
>


>>>
>>
>


RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-07 Thread Khandeshi, Ami
Tried, multiple permutation of setting home… Still same issue
> Sys.setenv(SPARK_HOME="c:\\DevTools\\spark-1.5.1")
> .libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))
> library(SparkR)

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

filter, na.omit

The following objects are masked from ‘package:base’:

intersect, rbind, sample, subset, summary, table, transform

> sc<-sparkR.init(master = "local")
Launching java with spark-submit command 
c:\DevTools\spark-1.5.1/bin/spark-submit.cmd   sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\RtmpkXZVBa\backend_port45ac487f2fbd
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds


From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, October 07, 2015 2:35 AM
To: Hossein; Khandeshi, Ami
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Not sure "/C/DevTools/spark-1.5.1/bin/spark-submit.cmd" is a valid?

From: Hossein [mailto:fal...@gmail.com]
Sent: Wednesday, October 7, 2015 12:46 AM
To: Khandeshi, Ami
Cc: Sun, Rui; akhandeshi; user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

Have you built the Spark jars? Can you run the Spark Scala shell?

--Hossein

On Tuesday, October 6, 2015, Khandeshi, Ami 
> wrote:
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
>
> sc <- sparkR.init(master="local")
Launching java with spark-submit command 
/C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds
In addition: Warning message:
running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"   --verbose 
sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had 
status 127

-Original Message-
From: Sun, Rui [mailto:rui@intel.com]
Sent: Tuesday, October 06, 2015 9:39 AM
To: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()?

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com]
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


--
--Hossein


What happens in the master or slave launch ?

2015-10-07 Thread camelia
Hello,

I have the following question:

I have two scenarios:
1) in one scenario (if I'm connected on the target node) the master starts
successfully.
Its log contains:

Spark Command: /usr/opt/java/jdk1.7.0_07/jre/bin/java -cp
/home/camelia/spark-1.4.1-bin-hadoop2.6/sbin/../conf/:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
-Dspark.deploy.defaultCores=2 -Xms512m -Xmx512m -XX:MaxPermSize=256m
org.apache.spark.deploy.master.Master --ip ttitania-6 --port 7077
--webui-port 8080

Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/10/07 13:23:55 INFO Master: Registered signal handlers for [TERM, HUP,
INT]
15/10/07 13:23:55 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/10/07 13:23:56 INFO SecurityManager: Changing view acls to: camelia
15/10/07 13:23:56 INFO SecurityManager: Changing modify acls to: camelia
15/10/07 13:23:56 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(camelia); users
with modify permissions: Set(camelia)
15/10/07 13:23:56 INFO Slf4jLogger: Slf4jLogger started
15/10/07 13:23:56 INFO Remoting: Starting remoting
15/10/07 13:23:57 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkMaster@ttitania-6:7077]
15/10/07 13:23:57 INFO Utils: Successfully started service 'sparkMaster' on
port 7077.
15/10/07 13:23:57 INFO Utils: Successfully started service on port 6066.
15/10/07 13:23:57 INFO StandaloneRestServer: Started REST server for
submitting applications on port 6066
15/10/07 13:23:57 INFO Master: Starting Spark master at
spark://ttitania-6:7077
15/10/07 13:23:57 INFO Master: Running Spark version 1.4.1
15/10/07 13:23:57 INFO Utils: Successfully started service 'MasterUI' on
port 8080.
15/10/07 13:23:57 INFO MasterWebUI: Started MasterWebUI at
http://129.11.11.111:8080
15/10/07 13:23:57 INFO Master: I have been elected leader! New state: ALIVE


2) in another scenario (trying to start the spark master remotely, using
qrsh as we are disallowed ssh between cluster nodes), when I try to start
one master, it only partially starts and then shuts down. Its log is:

Spark Command: /usr/opt/java/jdk1.7.0_07/jre/bin/java -cp
/home/camelia/spark-1.4.1-bin-hadoop2.6/sbin/../conf/:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/home/camelia/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
-Dspark.deploy.defaultCores=2 -Xms512m -Xmx512m -XX:MaxPermSize=256m
org.apache.spark.deploy.master.Master --ip ttitania-6 --port 7077
--webui-port 8080

Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/10/07 12:56:31 INFO Master: Registered signal handlers for [TERM, HUP,
INT]
15/10/07 12:56:32 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/10/07 12:56:32 INFO SecurityManager: Changing view acls to: camelia
15/10/07 12:56:32 INFO SecurityManager: Changing modify acls to: camelia
15/10/07 12:56:32 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(camelia); users
with modify permissions: Set(camelia)



Can you please explain which might be the problem?
Which is the step in Spark's starting the master process that determines
this crash? 

I wait impatiently for your reply.
Many thanks,
Camelia



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-happens-in-the-master-or-slave-launch-tp24968.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark performance non-linear response

2015-10-07 Thread Yadid Ayzenberg

Hi All,

Im using spark 1.4.1 to to analyze a largish data set (several Gigabytes 
of data). The RDD is partitioned into 2048 partitions which are more or 
less equal and entirely cached in RAM.
I evaluated the performance on several cluster sizes, and am witnessing 
a non linear (power) performance improvement as the cluster size 
increases (plot below). Each node has 4 cores and each worker is 
configured to use 10GB or RAM.


Spark performance

I would expect a more linear response given the number of partitions and 
the fact that all of the data is cached.

Can anyone suggest what I should tweak in order to improve the performance?
Or perhaps provide an explanation as to the behavior Im witnessing?

Yadid


This post has NOT been accepted by the mailing list yet.

2015-10-07 Thread akhandeshi
I seem to see this for many of my posts... does anyone have solution?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/This-post-has-NOT-been-accepted-by-the-mailing-list-yet-tp24969.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-07 Thread Cody Koeninger
When you say that the largest difference is from metrics.collect, how are
you measuring that? Wouldn't that be the difference between
max(partitionT1) and sparkT1, not sparkT0 and sparkT1?

As for further places to look, what's happening in the logs during that
time?  Are the number of messages per partition roughly equal (you should
be able to see that in the log line I mentioned earlier)?  Is there a
reason one of your executors is handling half as many tasks as the others?

On Wed, Oct 7, 2015 at 5:19 AM, Gerard Maas  wrote:

> Thanks for the feedback.
>
> Cassandra does not seem to be the issue. The time for writing to Cassandra
> is in the same order of magnitude (see below)
>
> The code structure is roughly as follows:
>
> dstream.filter(pred).foreachRDD{rdd =>
>   val sparkT0 = currentTimeMs
>   val metrics = rdd.mapPartitions{partition =>
>  val partitionT0 = currentTimeMs
>   partition.foreach{ transform andThen storeInCassandra _}
>  val partitionT1 = currentTimeMs
>  Seq(Metric( "local time", executor, partitionT1 - partitionT0,
> records)).iterator
>   }
>   //materialize the rdd
>   val allMetrics = metrics.collect()
>   val sparkT1 = currentTimeMs
>   val totalizedMetrics = // group by and reduce with sum
>   val sparkT2 = currentTimeMs
>   totalizedMetrics.foreach{ metric => gmetric.report(metric)}
> }
>
> Relating this code with the time table presented before (time in ms):
>
> How measured?Slow TaskFast Taskexecutor local totalizedMetrics347.6281.53spark
> computationsparkT1 - sparkT06930263metric collectionsparkT2 - sparkT170138wall
> clock processsparkT2 - sparkT07000401total records processed
> totalizedMetrics42975002
>
> What we observe is that the largest difference comes from the
> materialization of the RDD. This pattern repeats cyclically one on, one off.
>
> Any ideas where to further look?
>
> kr, Gerard.
>
>
> On Wed, Oct 7, 2015 at 1:33 AM, Tathagata Das  wrote:
>
>> Good point!
>>
>> On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger 
>> wrote:
>>
>>> I agree getting cassandra out of the picture is a good first step.
>>>
>>> But if you just do foreachRDD { _.count } recent versions of direct
>>> stream shouldn't do any work at all on the executor (since the number of
>>> messages in the rdd is known already)
>>>
>>> do a foreachPartition and println or count the iterator manually.
>>>
>>> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das 
>>> wrote:
>>>
 Are sure that this is not related to Cassandra inserts? Could you just
 do foreachRDD { _.count } instead  to keep Cassandra out of the picture and
 then test this agian.

 On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase 
 wrote:

> Also check if the Kafka cluster is still balanced. Maybe one of the
> brokers manages too many partitions, all the work will stay on that
> executor unless you repartition right after kakfka (and I'm not saying you
> should).
>
> Sent from my iPhone
>
> On 06 Oct 2015, at 22:17, Cody Koeninger  wrote:
>
> I'm not clear on what you're measuring.  Can you post relevant code
> snippets including the measurement code?
>
> As far as kafka metrics, nothing currently.  There is an info-level
> log message every time a kafka rdd iterator is instantiated,
>
> log.info(s"Computing topic ${part.topic}, partition
> ${part.partition} " +
>
>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>
>
> If you log once you're done with an iterator you should be able to see
> the delta.
>
> The other thing to try is reduce the number of parts involved in the
> job to isolate it ... first thing I'd do there is take cassandra out of 
> the
> equation.
>
>
>
> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas 
> wrote:
>
>> Hi Cody,
>>
>> The job is doing ETL from Kafka records to Cassandra. After a
>> single filtering stage on Spark, the 'TL' part is done using the
>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>
>> We have metrics on the executor work which we collect and add
>> together, indicated here by 'local computation'.  As you can see, we also
>> measure how much it cost us to measure :-)
>> See how 'local work'  times are comparable.  What's not visible is
>> the task scheduling and consuming the data from Kafka which becomes part 
>> of
>> the 'spark computation' part.
>>
>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>
>> Are there metrics available somehow on the Kafka reading time?
>>
>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>> records processed 4297 5002
>>

RE: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-07 Thread Goodall, Mark (UK)
I would like to say that I have also had this issue.

In two situations, one using Accumulo to store information and also when 
running multiple streaming jobs within the same streaming context (e.g. 
multiple save to hdfs). In my case the situation worsens when one of the jobs, 
which has a long slideduration executes. After this, all other jobs take longer 
to execute. The situation continues until the batches are too delayed and the 
system is unstable.


From: Gerard Maas [mailto:gerard.m...@gmail.com]
Sent: 07 October 2015 11:19
To: Tathagata Das
Cc: Cody Koeninger; Adrian Tanase; spark users
Subject: Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka


*** WARNING ***
This message originates from outside our organisation, either from an external 
partner or the internet.
Consider carefully whether you should click on any links, open any attachments 
or reply.
For information regarding Red Flags that you can look out for in emails you 
receive, click 
here.
If you feel the email is suspicious, please follow this 
process.
Thanks for the feedback.

Cassandra does not seem to be the issue. The time for writing to Cassandra is 
in the same order of magnitude (see below)

The code structure is roughly as follows:

dstream.filter(pred).foreachRDD{rdd =>
  val sparkT0 = currentTimeMs
  val metrics = rdd.mapPartitions{partition =>
 val partitionT0 = currentTimeMs
  partition.foreach{ transform andThen storeInCassandra _}
 val partitionT1 = currentTimeMs
 Seq(Metric( "local time", executor, partitionT1 - partitionT0, 
records)).iterator
  }
  //materialize the rdd
  val allMetrics = metrics.collect()
  val sparkT1 = currentTimeMs
  val totalizedMetrics = // group by and reduce with sum
  val sparkT2 = currentTimeMs
  totalizedMetrics.foreach{ metric => gmetric.report(metric)}
}

Relating this code with the time table presented before (time in ms):


How measured?

Slow Task

Fast Task

executor local

totalizedMetrics

347.6

281.53

spark computation

sparkT1 - sparkT0

6930

263

metric collection

sparkT2 - sparkT1

70

138

wall clock process

sparkT2 - sparkT0

7000

401

total records processed

totalizedMetrics

4297

5002


What we observe is that the largest difference comes from the materialization 
of the RDD. This pattern repeats cyclically one on, one off.

Any ideas where to further look?

kr, Gerard.


On Wed, Oct 7, 2015 at 1:33 AM, Tathagata Das 
> wrote:
Good point!

On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger 
> wrote:
I agree getting cassandra out of the picture is a good first step.

But if you just do foreachRDD { _.count } recent versions of direct stream 
shouldn't do any work at all on the executor (since the number of messages in 
the rdd is known already)

do a foreachPartition and println or count the iterator manually.

On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das 
> wrote:
Are sure that this is not related to Cassandra inserts? Could you just do 
foreachRDD { _.count } instead  to keep Cassandra out of the picture and then 
test this agian.

On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase 
> wrote:
Also check if the Kafka cluster is still balanced. Maybe one of the brokers 
manages too many partitions, all the work will stay on that executor unless you 
repartition right after kakfka (and I'm not saying you should).

Sent from my iPhone

On 06 Oct 2015, at 22:17, Cody Koeninger 
> wrote:
I'm not clear on what you're measuring.  Can you post relevant code snippets 
including the measurement code?

As far as kafka metrics, nothing currently.  There is an info-level log message 
every time a kafka rdd iterator is instantiated,


log.info(s"Computing topic ${part.topic}, partition 
${part.partition} " +

  s"offsets ${part.fromOffset} -> ${part.untilOffset}")



If you log once you're done with an iterator you should be able to see the 
delta.

The other thing to try is reduce the number of parts involved in the job to 
isolate it ... first thing I'd do there is take cassandra out of the equation.



On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas 
> wrote:
Hi Cody,

The job is doing ETL from Kafka records to Cassandra. After a single filtering 
stage on Spark, the 'TL' part is done using the 
dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.

We have metrics on the executor work which we collect and add together, 
indicated here by 'local computation'.  As you can see, we also measure how 
much it cost us to measure :-)
See how 

Graceful shutdown drops processing in Spark Streaming

2015-10-07 Thread Michal Čizmazia
After triggering the graceful shutdown on the following application, the
application stops before the windowed stream reaches its slide duration. As
a result, the data is not completely processed (i.e. saveToMyStorage is not
called) before shutdown.

According to the documentation, graceful shutdown should ensure that the
data, which has been received, is completely processed before shutdown.
https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code

Spark version: 1.4.1

Code snippet:

Function0 factory = () -> {
JavaStreamingContext context = new JavaStreamingContext(sparkConf,
Durations.minutes(1));
context.checkpoint("/test");
JavaDStream records =
context.receiverStream(myReliableReceiver).flatMap(...);
records.persist(StorageLevel.MEMORY_AND_DISK());
records.foreachRDD(rdd -> { rdd.count(); return null; });
records
.window(Durations.minutes(15), Durations.minutes(15))
.foreachRDD(rdd -> saveToMyStorage(rdd));
return context;
};

try (JavaStreamingContext context =
JavaStreamingContext.getOrCreate("/test", factory)) {
context.start();
waitForShutdownSignal();
Boolean stopSparkContext = true;
Boolean stopGracefully = true;
context.stop(stopSparkContext, stopGracefully);
}


Re: Spark job workflow engine recommendations

2015-10-07 Thread Nick Pentreath
We're also using Azkaban for scheduling, and we simply use spark-submit via 
she'll scripts. It works fine.




The auto retry feature with a large number of retries (like 100 or 1000 
perhaps) should take care of long-running jobs with restarts on failure. We 
haven't used it for streaming yet though we have long-running jobs and Azkaban 
won't kill them unless an SLA is in place.









—
Sent from Mailbox

On Wed, Oct 7, 2015 at 7:18 PM, Vikram Kone  wrote:

> Hien,
> I saw this pull request and from what I understand this is geared towards
> running spark jobs over hadoop. We are using spark over cassandra and not
> sure if this new jobtype supports that. I haven't seen any documentation in
> regards to how to use this spark job plugin, so that I can test it out on
> our cluster.
> We are currently submitting our spark jobs using command job type using the
> following command  "dse spark-submit --class com.org.classname ./test.jar"
> etc. What would be the advantage of using the native spark job type over
> command job type?
> I didn't understand from your reply if azkaban already supports long
> running jobs like spark streaming..does it? streaming jobs generally need
> to be running indefinitely or forever and needs to be restarted if for some
> reason they fail (lack of resources may be..). I can probably use the auto
> retry feature for this, but not sure
> I'm looking forward to the multiple executor support which will greatly
> enhance the scalability issue.
> On Wed, Oct 7, 2015 at 9:56 AM, Hien Luu  wrote:
>> The spark job type was added recently - see this pull request
>> https://github.com/azkaban/azkaban-plugins/pull/195.  You can leverage
>> the SLA feature to kill a job if it ran longer than expected.
>>
>> BTW, we just solved the scalability issue by supporting multiple
>> executors.  Within a week or two, the code for that should be merged in the
>> main trunk.
>>
>> Hien
>>
>> On Tue, Oct 6, 2015 at 9:40 PM, Vikram Kone  wrote:
>>
>>> Does Azkaban support scheduling long running jobs like spark steaming
>>> jobs? Will Azkaban kill a job if it's running for a long time.
>>>
>>>
>>> On Friday, August 7, 2015, Vikram Kone  wrote:
>>>
 Hien,
 Is Azkaban being phased out at linkedin as rumored? If so, what's
 linkedin going to use for workflow scheduling? Is there something else
 that's going to replace Azkaban?

 On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu  wrote:

> In my opinion, choosing some particular project among its peers should
> leave enough room for future growth (which may come faster than you
> initially think).
>
> Cheers
>
> On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu  wrote:
>
>> Scalability is a known issue due the the current architecture.
>> However this will be applicable if you run more 20K jobs per day.
>>
>> On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu  wrote:
>>
>>> From what I heard (an ex-coworker who is Oozie committer), Azkaban
>>> is being phased out at LinkedIn because of scalability issues (though
>>> UI-wise, Azkaban seems better).
>>>
>>> Vikram:
>>> I suggest you do more research in related projects (maybe using their
>>> mailing lists).
>>>
>>> Disclaimer: I don't work for LinkedIn.
>>>
>>> On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
 Hi Vikram,

 We use Azkaban (2.5.0) in our production workflow scheduling. We
 just use local mode deployment and it is fairly easy to set up. It is
 pretty easy to use and has a nice scheduling and logging interface, as 
 well
 as SLAs (like kill job and notify if it doesn't complete in 3 hours or
 whatever).

 However Spark support is not present directly - we run everything
 with shell scripts and spark-submit. There is a plugin interface where 
 one
 could create a Spark plugin, but I found it very cumbersome when I did
 investigate and didn't have the time to work through it to develop 
 that.

 It has some quirks and while there is actually a REST API for adding
 jos and dynamically scheduling jobs, it is not documented anywhere so 
 you
 kinda have to figure it out for yourself. But in terms of ease of use I
 found it way better than Oozie. I haven't tried Chronos, and it seemed
 quite involved to set up. Haven't tried Luigi either.

 Spark job server is good but as you say lacks some stuff like
 scheduling and DAG type workflows (independent of spark-defined job 
 flows).


 On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke 
 wrote:

> 

Re: Spark standalone hangup during shuffle flatMap or explode in cluster

2015-10-07 Thread Sean Owen
-dev

Is r.getInt(ind) very large in some cases? I think there's not quite
enough info here.

On Wed, Oct 7, 2015 at 6:23 PM,   wrote:
> When running stand-alone cluster mode job, the process hangs up randomly
> during a DataFrame flatMap or explode operation, in HiveContext:
>
> -->> df.flatMap(r => for (n <- 1 to r.getInt(ind)) yield r)
>
> This does not happen either with SQLContext in cluster, or Hive/SQL in local
> mode, where it works fine.
>
> A couple minutes after the hangup, executors start dropping. I am attching
> the logs
> Saif
>
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark standalone hangup during shuffle flatMap or explode in cluster

2015-10-07 Thread Saif.A.Ellafi
It can be large yes. But, that still does not resolve the question of why it 
works in smaller environment, i.e. Local[32] or in cluster mode when using 
SQLContext instead of HiveContext.

The process in general, is a RowNumber() hiveQL operation, that is why I need 
HiveContext.

I have the feeling there is something wrong with HiveContext. I dont have a 
Hive Hadoop database, I only enabled HiveContext to use its functions in my 
JSON loaded dataframe.

I am new at spark, please dont hesitate to ask for more information as I still 
not sure what would be relevant.

Saif

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, October 07, 2015 2:38 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: Spark standalone hangup during shuffle flatMap or explode in 
cluster

-dev

Is r.getInt(ind) very large in some cases? I think there's not quite enough 
info here.

On Wed, Oct 7, 2015 at 6:23 PM,   wrote:
> When running stand-alone cluster mode job, the process hangs up 
> randomly during a DataFrame flatMap or explode operation, in HiveContext:
>
> -->> df.flatMap(r => for (n <- 1 to r.getInt(ind)) yield r)
>
> This does not happen either with SQLContext in cluster, or Hive/SQL in 
> local mode, where it works fine.
>
> A couple minutes after the hangup, executors start dropping. I am 
> attching the logs Saif
>
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org


Re: How can I disable logging when running local[*]?

2015-10-07 Thread Alex Kozlov
Hmm, clearly the parameter is not passed to the program.  This should be an
activator issue.  I wonder how do you specify the other parameters, like
driver memory, num cores, etc.?  Just out of curiosity, can you run a
program:

import org.apache.spark.SparkConf
val out=new SparkConf(true).get("spark.driver.extraJavaOptions")

in your env and see what the output is?

Also, make sure spark-defaults.conf is on your classpath.

On Tue, Oct 6, 2015 at 11:19 AM, Jeff Jones 
wrote:

> Here’s an example. I echoed JAVA_OPTS so that you can see what I’ve got.
> Then I call ‘activator run’ in the project directory.
>
>
> jjones-mac:analyzer-perf jjones$ echo $JAVA_OPTS
>
> -Xmx4g -Xmx4g
> -Dlog4j.configuration=file:/Users/jjones/src/adaptive/adaptiveobjects/analyzer-perf/conf/log4j.properties
>
> jjones-mac:analyzer-perf jjones$ activator run
>
> [info] Loading project definition from
> /Users/jjones/src/adaptive/adaptiveobjects/analyzer-perf/project
>
> [info] Set current project to analyzer-perf (in build
> file:/Users/jjones/src/adaptive/adaptiveobjects/analyzer-perf/)
>
> [info] Running com.adaptive.analyzer.perf.AnalyzerPerf
>
> 11:15:24.066 [run-main-0] INFO  org.apache.spark.SparkContext - Running
> Spark version 1.4.1
>
> 11:15:24.150 [run-main-0] DEBUG o.a.h.m.lib.MutableMetricsFactory - field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=,
> always=false, sampleName=Ops, type=DEFAULT, value=[Rate of successful
> kerberos logins and latency (milliseconds)], valueName=Time)
>
> 11:15:24.156 [run-main-0] DEBUG o.a.h.m.lib.MutableMetricsFactory - field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
> with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=,
> always=false, sampleName=Ops, type=DEFAULT, value=[Rate of failed kerberos
> logins and latency (milliseconds)], valueName=Time)
>
> As I mentioned below but repeated for completeness, I also have this in my
> code.
>
> import org.apache.log4j.PropertyConfigurator
>
> PropertyConfigurator.configure("conf/log4j.properties")
> Logger.getRootLogger().setLevel(Level.OFF)
> Logger.getLogger("org").setLevel(Level.OFF)
> Logger.getLogger("akka").setLevel(Level.OFF)
>
> And here’s my log4j.properties (note, I’ve also tried setting the level to
> OFF):
>
> # Set everything to be logged to the console
>
> log4j.rootCategory=WARN
>
> log4j.appender.console=org.apache.log4j.ConsoleAppender
>
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>
> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
> %c{1}: %m%n
>
>
> # Change this to set Spark log level
>
> log4j.logger.org.apache.spark=WARN
>
>
> # Silence akka remoting
>
> log4j.logger.Remoting=WARN
>
>
> # Ignore messages below warning level from Jetty, because it's a bit
> verbose
>
> log4j.logger.org.eclipse.jetty=WARN
>
>
> spark.log.threshold=OFF
>
> spark.root.logger=OFF,DRFA
>
>
> From: Alex Kozlov
> Date: Tuesday, October 6, 2015 at 10:50 AM
>
> To: Jeff Jones
> Cc: "user@spark.apache.org"
> Subject: Re: How can I disable logging when running local[*]?
>
> Try
>
> JAVA_OPTS='-Dlog4j.configuration=file:/'
>
> Internally, this is just spark.driver.extraJavaOptions, which you should
> be able to set in conf/spark-defaults.conf
>
> Can you provide more details how you invoke the driver?
>
> On Tue, Oct 6, 2015 at 9:48 AM, Jeff Jones 
> wrote:
>
>> Thanks. Any chance you know how to pass this to a Scala app that is run
>> via TypeSafe activator?
>>
>> I tried putting it $JAVA_OPTS but I get:
>>
>> Unrecognized option: --driver-java-options
>>
>> Error: Could not create the Java Virtual Machine.
>>
>> Error: A fatal exception has occurred. Program will exit.
>>
>>
>> I tried a bunch of different quoting but nothing produced a good result.
>> I also tried passing it directly to activator using –jvm but it still
>> produces the same results with verbose logging. Is there a way I can tell
>> if it’s picking up my file?
>>
>>
>>
>> From: Alex Kozlov
>> Date: Monday, October 5, 2015 at 8:34 PM
>> To: Jeff Jones
>> Cc: "user@spark.apache.org"
>> Subject: Re: How can I disable logging when running local[*]?
>>
>> Did you try “--driver-java-options
>> '-Dlog4j.configuration=file:/'” and setting the
>> log4j.rootLogger=FATAL,console?
>>
>> On Mon, Oct 5, 2015 at 8:19 PM, Jeff Jones 
>> wrote:
>>
>>> I’ve written an application that hosts the Spark driver in-process using
>>> “local[*]”. I’ve turned off logging in my conf/log4j.properties file. I’ve
>>> also tried putting the following code prior to creating my SparkContext.
>>> These were coupled together from various posts I’ve. None of these steps
>>> have worked. I’m still getting a ton of logging to the console. Anything
>>> else 

Re: spark performance non-linear response

2015-10-07 Thread Sean Owen
OK, next question then is: if this is wall-clock time for the whole
process, then, I wonder if you are just measuring the time taken by the
longest single task. I'd expect the time taken by the longest straggler
task to follow a distribution like this. That is, how balanced are the
partitions?

Are you running so many executors that nodes are bottlenecking on CPU, or
swapping?


On Wed, Oct 7, 2015 at 4:42 PM, Yadid Ayzenberg  wrote:

> Additional missing relevant information:
>
> Im running a transformation, there are no Shuffles occurring and at the
> end im performing a lookup of 4 partitions on the driver.
>
>
>
>
> On 10/7/15 11:26 AM, Yadid Ayzenberg wrote:
>
> Hi All,
>
> Im using spark 1.4.1 to to analyze a largish data set (several Gigabytes
> of data). The RDD is partitioned into 2048 partitions which are more or
> less equal and entirely cached in RAM.
> I evaluated the performance on several cluster sizes, and am witnessing a
> non linear (power) performance improvement as the cluster size increases
> (plot below). Each node has 4 cores and each worker is configured to use
> 10GB or RAM.
>
> [image: Spark performance]
>
> I would expect a more linear response given the number of partitions and
> the fact that all of the data is cached.
> Can anyone suggest what I should tweak in order to improve the performance?
> Or perhaps provide an explanation as to the behavior Im witnessing?
>
> Yadid
>
>
>


DataFrame with bean class

2015-10-07 Thread VJ
I have a bean class defined as follows:

class result {
private String name;

public result() { };

public String getname () {return name;}
public void setname (String s) {name = s;)
}

I then define 
DataFrame x = SqlContext.createDataFrame(myrdd, result.class);
x.show()

When I run this job, I am getting the exception: illegalAccessException,
class sqlContexapply cannot access a member or result with modifier
public. 

I have defined the bean class in the same package as where I am accessing
from. But this is in a different file. 
Any thoughts? I guess, it is an issue with java reflection?? any approach to
solve this?

VJ






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-with-bean-class-tp24970.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark performance non-linear response

2015-10-07 Thread Yadid Ayzenberg

Additional missing relevant information:

Im running a transformation, there are no Shuffles occurring and at the 
end im performing a lookup of 4 partitions on the driver.




On 10/7/15 11:26 AM, Yadid Ayzenberg wrote:

Hi All,

Im using spark 1.4.1 to to analyze a largish data set (several 
Gigabytes of data). The RDD is partitioned into 2048 partitions which 
are more or less equal and entirely cached in RAM.
I evaluated the performance on several cluster sizes, and am 
witnessing a non linear (power) performance improvement as the cluster 
size increases (plot below). Each node has 4 cores and each worker is 
configured to use 10GB or RAM.


Spark performance

I would expect a more linear response given the number of partitions 
and the fact that all of the data is cached.
Can anyone suggest what I should tweak in order to improve the 
performance?

Or perhaps provide an explanation as to the behavior Im witnessing?

Yadid




Re: spark performance non-linear response

2015-10-07 Thread Jonathan Coveney
I've noticed this as well and am curious if there is anything more people
can say.

My theory is that it is just communication overhead. If you only have a
couple of gigabytes (a tiny dataset), then spotting that into 50 nodes
means you'll have a ton of tiny partitions all finishing very quickly, and
thus creating a lot of communication overhead per amount of data processed.
Just a theory though.

El miércoles, 7 de octubre de 2015, Yadid Ayzenberg 
escribió:

> Additional missing relevant information:
>
> Im running a transformation, there are no Shuffles occurring and at the
> end im performing a lookup of 4 partitions on the driver.
>
>
>
> On 10/7/15 11:26 AM, Yadid Ayzenberg wrote:
>
> Hi All,
>
> Im using spark 1.4.1 to to analyze a largish data set (several Gigabytes
> of data). The RDD is partitioned into 2048 partitions which are more or
> less equal and entirely cached in RAM.
> I evaluated the performance on several cluster sizes, and am witnessing a
> non linear (power) performance improvement as the cluster size increases
> (plot below). Each node has 4 cores and each worker is configured to use
> 10GB or RAM.
>
> [image: Spark performance]
>
> I would expect a more linear response given the number of partitions and
> the fact that all of the data is cached.
> Can anyone suggest what I should tweak in order to improve the performance?
> Or perhaps provide an explanation as to the behavior Im witnessing?
>
> Yadid
>
>
>


Re: This post has NOT been accepted by the mailing list yet.

2015-10-07 Thread Richard Hillegas

Hi Akhandeshi,

It may be that you are not seeing your own posts because you are sending
from a gmail account. See for instance
https://support.google.com/a/answer/1703601?hl=en

Hope this helps,
Rick Hillegas
STSM, IBM Analytics, Platform - IBM USA


akhandeshi  wrote on 10/07/2015 08:10:32 AM:

> From: akhandeshi 
> To: user@spark.apache.org
> Date: 10/07/2015 08:10 AM
> Subject: This post has NOT been accepted by the mailing list yet.
>
> I seem to see this for many of my posts... does anyone have solution?
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/This-post-has-NOT-been-accepted-by-the-
> mailing-list-yet-tp24969.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

Re: Temp files are not removed when done (Mesos)

2015-10-07 Thread Iulian Dragoș
It is indeed a bug. I believe the shutdown procedure in #7820 only kicks in
when the external shuffle service is enabled (a pre-requisite of dynamic
allocation). As a workaround you can use dynamic allocation (you can set
spark.dynamicAllocation.maxExecutors and
spark.dynamicAllocation.minExecutors to the same value.

I'll file a Jira ticket.

On Wed, Oct 7, 2015 at 10:14 AM, Alexei Bakanov  wrote:

> Hi
>
> I'm running Spark 1.5.1 on Mesos in coarse-grained mode. No dynamic
> allocation or shuffle service. I see that there are two types of temporary
> files under /tmp folder associated with every executor: /tmp/spark-
> and /tmp/blockmgr-. When job is finished /tmp/spark- is gone,
> but blockmgr directory is left with all gigabytes in it. In Spark 1.4.1
> blockmgr- folder was under /tmp/spark-, so when /tmp/spark
> folder was removed blockmgr was gone too.
> Is it a bug in 1.5.1?
>
> By the way, in fine-grain mode /tmp/spark- folder does not get
> removed in neither 1.4.1 nor 1.5.1 for some reason.
>
> Thanks,
> Alexei
>



-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Temp files are not removed when done (Mesos)

2015-10-07 Thread Iulian Dragoș
https://issues.apache.org/jira/browse/SPARK-10975

On Wed, Oct 7, 2015 at 11:36 AM, Iulian Dragoș 
wrote:

> It is indeed a bug. I believe the shutdown procedure in #7820 only kicks
> in when the external shuffle service is enabled (a pre-requisite of dynamic
> allocation). As a workaround you can use dynamic allocation (you can set
> spark.dynamicAllocation.maxExecutors and
> spark.dynamicAllocation.minExecutors to the same value.
>
> I'll file a Jira ticket.
>
> On Wed, Oct 7, 2015 at 10:14 AM, Alexei Bakanov  wrote:
>
>> Hi
>>
>> I'm running Spark 1.5.1 on Mesos in coarse-grained mode. No dynamic
>> allocation or shuffle service. I see that there are two types of temporary
>> files under /tmp folder associated with every executor: /tmp/spark-
>> and /tmp/blockmgr-. When job is finished /tmp/spark- is gone,
>> but blockmgr directory is left with all gigabytes in it. In Spark 1.4.1
>> blockmgr- folder was under /tmp/spark-, so when /tmp/spark
>> folder was removed blockmgr was gone too.
>> Is it a bug in 1.5.1?
>>
>> By the way, in fine-grain mode /tmp/spark- folder does not get
>> removed in neither 1.4.1 nor 1.5.1 for some reason.
>>
>> Thanks,
>> Alexei
>>
>
>
>
> --
>
> --
> Iulian Dragos
>
> --
> Reactive Apps on the JVM
> www.typesafe.com
>
>


-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Does feature parity exist between Spark and PySpark

2015-10-07 Thread sethah
Regarding features, the general workflow for the Spark community when adding
new features is to first add them in Scala (since Spark is written in
Scala). Once this is done, a Jira ticket will be created requesting that the
feature be added to the Python API (example -  SPARK-9773
  ). Some of these Python
API tickets get done very quickly, some don't. As such, the Scala API will
always be more feature rich from a Spark perspective, while the Python API
can lag behind in some cases. In general, the intent is to make the PySpark
API contain all features of the Scala API, since Python is considered a
first class citizen in the Spark community; the difference is that if you
need the latest and greatest and need it right away, Scala is the best
choice.

Regarding performance, others have said it very eloquently:


https://www.linkedin.com/pulse/why-i-choose-scala-apache-spark-project-lan-jiang

  
http://stackoverflow.com/questions/17236936/api-compatibility-between-scala-and-python

  
http://apache-spark-developers-list.1001551.n3.nabble.com/A-Comparison-of-Platforms-for-Implementing-and-Running-Very-Large-Scale-Machine-Learning-Algorithms-td7823.html#a7824

  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963p24971.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark multi tenancy

2015-10-07 Thread Steve Loughran

On 7 Oct 2015, at 11:06, ayan guha 
> wrote:


Can queues also be used to separate workloads?

yes; that's standard practise. Different YARN queues can have different maximum 
memory & CPU, and you can even tag queues as "pre-emptible", so more important 
work can kill containers if there isn't space on the cluster

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html#Queue_Properties



Parquet file size

2015-10-07 Thread Younes Naguib
Hi,

I'm reading a large tsv file, and creating parquet files using sparksql:
insert overwrite
table tbl partition(year, month, day)
Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).
I wanted to generate larger files, any idea how to address this?

Thanks,
Younes Naguib
Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com 



Re: compatibility issue with Jersey2

2015-10-07 Thread Marcelo Vanzin
Seems like you might be running into
https://issues.apache.org/jira/browse/SPARK-10910. I've been busy with
other things but plan to take a look at that one when I find time...
right now I don't really have a solution, other than making sure your
application's jars do not include those classes the exception is
complaining about.

On Wed, Oct 7, 2015 at 10:23 AM, Gary Ogden  wrote:
> What you suggested seems to have worked for unit tests. But now it throws
> this at run time on mesos with spark-submit:
>
> Exception in thread "main" java.lang.LinkageError: loader constraint
> violation: when resolving method
> "org.slf4j.impl.StaticLoggerBinder.getLoggerFactory()Lorg/slf4j/ILoggerFactory;"
> the class loader (instance of
> org/apache/spark/util/ChildFirstURLClassLoader) of the current class,
> org/slf4j/LoggerFactory, and the class loader (instance of
> sun/misc/Launcher$AppClassLoader) for resolved class,
> org/slf4j/impl/StaticLoggerBinder, have different Class objects for the type
> LoggerFactory; used in the signature
>   at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:336)
>   at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:284)
>   at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305)
>   at com.company.spark.utils.SparkJob.(SparkJob.java:41)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Unknown Source)
>   at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:634)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> On 6 October 2015 at 16:20, Marcelo Vanzin  wrote:
>>
>> On Tue, Oct 6, 2015 at 12:04 PM, Gary Ogden  wrote:
>> > But we run unit tests differently in our build environment, which is
>> > throwing the error. It's setup like this:
>> >
>> > I suspect this is what you were referring to when you said I have a
>> > problem?
>>
>> Yes, that is what I was referring to. But, in your test environment,
>> you might be able to work around the problem by setting
>> "spark.ui.enabled=false"; that should disable all the code that uses
>> Jersey, so you can use your newer version in your unit tests.
>>
>>
>> --
>> Marcelo
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Optimal way to avoid processing null returns in Spark Scala

2015-10-07 Thread swetha
Hi,

I have the following functions that I am using for my job in Scala. If you
see the getSessionId function I am returning null sometimes. If I return
null the only way that I can avoid processing those records is by filtering
out null records. I wanted to avoid having another pass for filtering so I
tried returning "None" . But, it seems to be having issues as it demands the
return type as optional. What is the optimal way to avoid processing null
records and at the same way avoid having Option as the return type using
None? The use of Option[] and Some(()) seems to be having type issues in
subsequent function calls.


val sessions = filteredStream.transform(rdd=>getBeaconMap(rdd))

  def getBeaconMap(rdd: RDD[(String, String)]): RDD[(String, (Long,
String))] = {
rdd.map[(String, (Long, String))]{ case (x, y) =>
  ((getSessionId(y), (getTimeStamp(y).toLong,y)))
}
  }

  def getSessionId(eventRecord:String): String = {
val beaconTestImpl: BeaconTestLoader = new BeaconTestImpl//This needs to
be changed.
val beaconEvent: BeaconEventData =
beaconTestImpl.getBeaconEventData(eventRecord)

if(beaconEvent!=null){
   beaconEvent.getSessionID //This might be in Set Cookie header
}else{
 null
}


val groupedAndSortedSessions =
sessions.transform(rdd=>ExpoJobCommonNew.getGroupedAndSortedSessions(rdd))




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Optimal-way-to-avoid-processing-null-returns-in-Spark-Scala-tp24972.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is the difference between ml.classification.LogisticRegression and mllib.classification.LogisticRegressionWithLBFGS

2015-10-07 Thread Joseph Bradley
Hi YiZhi Liu,

The spark.ml classes are part of the higher-level "Pipelines" API, which
works with DataFrames.  When creating this API, we decided to separate it
from the old API to avoid confusion.  You can read more about it here:
http://spark.apache.org/docs/latest/ml-guide.html

For (3): We use Breeze, but we have to modify it in order to do distributed
optimization based on Spark.

Joseph

On Tue, Oct 6, 2015 at 11:47 PM, YiZhi Liu  wrote:

> Hi everyone,
>
> I'm curious about the difference between
> ml.classification.LogisticRegression and
> mllib.classification.LogisticRegressionWithLBFGS. Both of them are
> optimized using LBFGS, the only difference I see is LogisticRegression
> takes DataFrame while LogisticRegressionWithLBFGS takes RDD.
>
> So I wonder,
> 1. Why not simply add a DataFrame training interface to
> LogisticRegressionWithLBFGS?
> 2. Whats the difference between ml.classification and
> mllib.classification package?
> 3. Why doesn't ml.classification.LogisticRegression call
> mllib.optimization.LBFGS / mllib.optimization.OWLQN directly? Instead,
> it uses breeze.optimize.LBFGS and re-implements most of the procedures
> in mllib.optimization.{LBFGS,OWLQN}.
>
> Thank you.
>
> Best,
>
> --
> Yizhi Liu
> Senior Software Engineer / Data Mining
> www.mvad.com, Shanghai, China
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark job workflow engine recommendations

2015-10-07 Thread Vikram Kone
Hien,
I saw this pull request and from what I understand this is geared towards
running spark jobs over hadoop. We are using spark over cassandra and not
sure if this new jobtype supports that. I haven't seen any documentation in
regards to how to use this spark job plugin, so that I can test it out on
our cluster.
We are currently submitting our spark jobs using command job type using the
following command  "dse spark-submit --class com.org.classname ./test.jar"
etc. What would be the advantage of using the native spark job type over
command job type?

I didn't understand from your reply if azkaban already supports long
running jobs like spark streaming..does it? streaming jobs generally need
to be running indefinitely or forever and needs to be restarted if for some
reason they fail (lack of resources may be..). I can probably use the auto
retry feature for this, but not sure

I'm looking forward to the multiple executor support which will greatly
enhance the scalability issue.

On Wed, Oct 7, 2015 at 9:56 AM, Hien Luu  wrote:

> The spark job type was added recently - see this pull request
> https://github.com/azkaban/azkaban-plugins/pull/195.  You can leverage
> the SLA feature to kill a job if it ran longer than expected.
>
> BTW, we just solved the scalability issue by supporting multiple
> executors.  Within a week or two, the code for that should be merged in the
> main trunk.
>
> Hien
>
> On Tue, Oct 6, 2015 at 9:40 PM, Vikram Kone  wrote:
>
>> Does Azkaban support scheduling long running jobs like spark steaming
>> jobs? Will Azkaban kill a job if it's running for a long time.
>>
>>
>> On Friday, August 7, 2015, Vikram Kone  wrote:
>>
>>> Hien,
>>> Is Azkaban being phased out at linkedin as rumored? If so, what's
>>> linkedin going to use for workflow scheduling? Is there something else
>>> that's going to replace Azkaban?
>>>
>>> On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu  wrote:
>>>
 In my opinion, choosing some particular project among its peers should
 leave enough room for future growth (which may come faster than you
 initially think).

 Cheers

 On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu  wrote:

> Scalability is a known issue due the the current architecture.
> However this will be applicable if you run more 20K jobs per day.
>
> On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu  wrote:
>
>> From what I heard (an ex-coworker who is Oozie committer), Azkaban
>> is being phased out at LinkedIn because of scalability issues (though
>> UI-wise, Azkaban seems better).
>>
>> Vikram:
>> I suggest you do more research in related projects (maybe using their
>> mailing lists).
>>
>> Disclaimer: I don't work for LinkedIn.
>>
>> On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> Hi Vikram,
>>>
>>> We use Azkaban (2.5.0) in our production workflow scheduling. We
>>> just use local mode deployment and it is fairly easy to set up. It is
>>> pretty easy to use and has a nice scheduling and logging interface, as 
>>> well
>>> as SLAs (like kill job and notify if it doesn't complete in 3 hours or
>>> whatever).
>>>
>>> However Spark support is not present directly - we run everything
>>> with shell scripts and spark-submit. There is a plugin interface where 
>>> one
>>> could create a Spark plugin, but I found it very cumbersome when I did
>>> investigate and didn't have the time to work through it to develop that.
>>>
>>> It has some quirks and while there is actually a REST API for adding
>>> jos and dynamically scheduling jobs, it is not documented anywhere so 
>>> you
>>> kinda have to figure it out for yourself. But in terms of ease of use I
>>> found it way better than Oozie. I haven't tried Chronos, and it seemed
>>> quite involved to set up. Haven't tried Luigi either.
>>>
>>> Spark job server is good but as you say lacks some stuff like
>>> scheduling and DAG type workflows (independent of spark-defined job 
>>> flows).
>>>
>>>
>>> On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke 
>>> wrote:
>>>
 Check also falcon in combination with oozie

 Le ven. 7 août 2015 à 17:51, Hien Luu 
 a écrit :

> Looks like Oozie can satisfy most of your requirements.
>
>
>
> On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone 
> wrote:
>
>> Hi,
>> I'm looking for open source workflow tools/engines that allow us
>> to schedule spark jobs on a datastax cassandra cluster. Since there 
>> are
>> tonnes of alternatives out there like Ozzie, Azkaban, Luigi , 

Re: Spark job workflow engine recommendations

2015-10-07 Thread Hien Luu
The spark job type was added recently - see this pull request
https://github.com/azkaban/azkaban-plugins/pull/195.  You can leverage the
SLA feature to kill a job if it ran longer than expected.

BTW, we just solved the scalability issue by supporting multiple
executors.  Within a week or two, the code for that should be merged in the
main trunk.

Hien

On Tue, Oct 6, 2015 at 9:40 PM, Vikram Kone  wrote:

> Does Azkaban support scheduling long running jobs like spark steaming
> jobs? Will Azkaban kill a job if it's running for a long time.
>
>
> On Friday, August 7, 2015, Vikram Kone  wrote:
>
>> Hien,
>> Is Azkaban being phased out at linkedin as rumored? If so, what's
>> linkedin going to use for workflow scheduling? Is there something else
>> that's going to replace Azkaban?
>>
>> On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu  wrote:
>>
>>> In my opinion, choosing some particular project among its peers should
>>> leave enough room for future growth (which may come faster than you
>>> initially think).
>>>
>>> Cheers
>>>
>>> On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu  wrote:
>>>
 Scalability is a known issue due the the current architecture.  However
 this will be applicable if you run more 20K jobs per day.

 On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu  wrote:

> From what I heard (an ex-coworker who is Oozie committer), Azkaban is
> being phased out at LinkedIn because of scalability issues (though 
> UI-wise,
> Azkaban seems better).
>
> Vikram:
> I suggest you do more research in related projects (maybe using their
> mailing lists).
>
> Disclaimer: I don't work for LinkedIn.
>
> On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath <
> nick.pentre...@gmail.com> wrote:
>
>> Hi Vikram,
>>
>> We use Azkaban (2.5.0) in our production workflow scheduling. We just
>> use local mode deployment and it is fairly easy to set up. It is pretty
>> easy to use and has a nice scheduling and logging interface, as well as
>> SLAs (like kill job and notify if it doesn't complete in 3 hours or
>> whatever).
>>
>> However Spark support is not present directly - we run everything
>> with shell scripts and spark-submit. There is a plugin interface where 
>> one
>> could create a Spark plugin, but I found it very cumbersome when I did
>> investigate and didn't have the time to work through it to develop that.
>>
>> It has some quirks and while there is actually a REST API for adding
>> jos and dynamically scheduling jobs, it is not documented anywhere so you
>> kinda have to figure it out for yourself. But in terms of ease of use I
>> found it way better than Oozie. I haven't tried Chronos, and it seemed
>> quite involved to set up. Haven't tried Luigi either.
>>
>> Spark job server is good but as you say lacks some stuff like
>> scheduling and DAG type workflows (independent of spark-defined job 
>> flows).
>>
>>
>> On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke 
>> wrote:
>>
>>> Check also falcon in combination with oozie
>>>
>>> Le ven. 7 août 2015 à 17:51, Hien Luu  a
>>> écrit :
>>>
 Looks like Oozie can satisfy most of your requirements.



 On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone 
 wrote:

> Hi,
> I'm looking for open source workflow tools/engines that allow us
> to schedule spark jobs on a datastax cassandra cluster. Since there 
> are
> tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos 
> etc,
> I wanted to check with people here to see what they are using today.
>
> Some of the requirements of the workflow engine that I'm looking
> for are
>
> 1. First class support for submitting Spark jobs on Cassandra. Not
> some wrapper Java code to submit tasks.
> 2. Active open source community support and well tested at
> production scale.
> 3. Should be dead easy to write job dependencices using XML or web
> interface . Ex; job A depends on Job B and Job C, so run Job A after 
> B and
> C are finished. Don't need to write full blown java applications to 
> specify
> job parameters and dependencies. Should be very simple to use.
> 4. Time based  recurrent scheduling. Run the spark jobs at a given
> time every hour or day or week or month.
> 5. Job monitoring, alerting on failures and email notifications on
> daily basis.
>
> I have looked at Ooyala's spark job server which seems to be hated
> towards making spark jobs run faster by sharing contexts between the 

Re: Does feature parity exist between Spark and PySpark

2015-10-07 Thread Michael Armbrust
>
> At my company we use Avro heavily and it's not been fun when i've tried to
> work with complex avro schemas and python. This may not be relevant to you
> however...otherwise I found Python to be a great fit for Spark :)
>

Have you tried using https://github.com/databricks/spark-avro ? It should
help you avoid needing to work with avro directly by translating to/from
DataFrames.


Spark standalone hangup during shuffle flatMap or explode in cluster

2015-10-07 Thread Saif.A.Ellafi
When running stand-alone cluster mode job, the process hangs up randomly during 
a DataFrame flatMap or explode operation, in HiveContext:

-->> df.flatMap(r => for (n <- 1 to r.getInt(ind)) yield r)

This does not happen either with SQLContext in cluster, or Hive/SQL in local 
mode, where it works fine.

A couple minutes after the hangup, executors start dropping. I am attching the 
logs

Saif




15/10/07 12:15:19 INFO TaskSetManager: Finished task 50.0 in stage 17.0 (TID 
166) in 2511 ms on 162.101.194.47 (180/200)
15/10/07 12:15:19 INFO TaskSetManager: Finished task 66.0 in stage 17.0 (TID 
182) in 2510 ms on 162.101.194.47 (181/200)
15/10/07 12:15:19 INFO TaskSetManager: Finished task 110.0 in stage 17.0 (TID 
226) in 2505 ms on 162.101.194.47 (182/200)
15/10/07 12:15:19 INFO TaskSetManager: Finished task 74.0 in stage 17.0 (TID 
190) in 2530 ms on 162.101.194.47 (183/200)
15/10/07 12:15:19 INFO TaskSetManager: Finished task 106.0 in stage 17.0 (TID 
222) in 2530 ms on 162.101.194.47 (184/200)
15/10/07 12:20:01 WARN HeartbeatReceiver: Removing executor 2 with no recent 
heartbeats: 141447 ms exceeds timeout 12 ms
15/10/07 12:20:01 ERROR TaskSchedulerImpl: Lost executor 2 on 162.101.194.44: 
Executor heartbeat timed out after 141447 ms
15/10/07 12:20:01 INFO TaskSetManager: Re-queueing tasks for 2 from TaskSet 17.0
15/10/07 12:20:01 WARN TaskSetManager: Lost task 113.0 in stage 17.0 (TID 229, 
162.101.194.44): ExecutorLostFailure (executor 2 lost)
15/10/07 12:20:01 WARN TaskSetManager: Lost task 73.0 in stage 17.0 (TID 189, 
162.101.194.44): ExecutorLostFailure (executor 2 lost)
15/10/07 12:20:01 WARN TaskSetManager: Lost task 81.0 in stage 17.0 (TID 197, 
162.101.194.44): ExecutorLostFailure (executor 2 lost)
15/10/07 12:20:01 INFO TaskSetManager: Starting task 81.1 in stage 17.0 (TID 
316, 162.101.194.45, PROCESS_LOCAL, 2045 bytes)
15/10/07 12:20:01 INFO TaskSetManager: Starting task 73.1 in stage 17.0 (TID 
317, 162.101.194.44, PROCESS_LOCAL, 2045 bytes)
15/10/07 12:20:01 INFO TaskSetManager: Starting task 113.1 in stage 17.0 (TID 
318, 162.101.194.48, PROCESS_LOCAL, 2045 bytes)
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Requesting to kill 
executor(s) 2
15/10/07 12:20:01 INFO DAGScheduler: Executor lost: 2 (epoch 4)
15/10/07 12:20:01 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 
from BlockManagerMaster.
15/10/07 12:20:01 INFO BlockManagerMasterEndpoint: Removing block manager 
BlockManagerId(2, 162.101.194.44, 42537)
15/10/07 12:20:01 INFO BlockManagerMaster: Removed 2 successfully in 
removeExecutor
15/10/07 12:20:01 INFO ShuffleMapStage: ShuffleMapStage 15 is now unavailable 
on executor 2 (1/2, false)
15/10/07 12:20:01 INFO ShuffleMapStage: ShuffleMapStage 16 is now unavailable 
on executor 2 (8/16, false)
15/10/07 12:20:01 INFO DAGScheduler: Host added was in lost list earlier: 
162.101.194.44
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor added: 
app-20151007121501-0022/69 on worker-20151007063932-162.101.194.44-57091 
(162.101.194.44:57091) with 32 cores
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20151007121501-0022/69 on hostPort 162.101.194.44:57091 with 32 cores, 
100.0 GB RAM
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/69 is now RUNNING
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/69 is now LOADING
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/69 is now EXITED (Command exited with code 1)
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Executor 
app-20151007121501-0022/69 removed: Command exited with code 1
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 69
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor added: 
app-20151007121501-0022/70 on worker-20151007063932-162.101.194.44-57091 
(162.101.194.44:57091) with 32 cores
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20151007121501-0022/70 on hostPort 162.101.194.44:57091 with 32 cores, 
100.0 GB RAM
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/70 is now RUNNING
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/70 is now LOADING
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/70 is now EXITED (Command exited with code 1)
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Executor 
app-20151007121501-0022/70 removed: Command exited with code 1
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 70
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor added: 
app-20151007121501-0022/71 on worker-20151007063932-162.101.194.44-57091 
(162.101.194.44:57091) with 32 cores
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20151007121501-0022/71 on 

Re: compatibility issue with Jersey2

2015-10-07 Thread Gary Ogden
What you suggested seems to have worked for unit tests. But now it throws
this at run time on mesos with spark-submit:

Exception in thread "main" java.lang.LinkageError: loader constraint
violation: when resolving method
"org.slf4j.impl.StaticLoggerBinder.getLoggerFactory()Lorg/slf4j/ILoggerFactory;"
the class loader (instance of
org/apache/spark/util/ChildFirstURLClassLoader) of the current class,
org/slf4j/LoggerFactory, and the class loader (instance of
sun/misc/Launcher$AppClassLoader) for resolved class,
org/slf4j/impl/StaticLoggerBinder, have different Class objects for
the type LoggerFactory; used in the signature
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:336)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:284)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305)
at com.company.spark.utils.SparkJob.(SparkJob.java:41)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:634)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


On 6 October 2015 at 16:20, Marcelo Vanzin  wrote:

> On Tue, Oct 6, 2015 at 12:04 PM, Gary Ogden  wrote:
> > But we run unit tests differently in our build environment, which is
> > throwing the error. It's setup like this:
> >
> > I suspect this is what you were referring to when you said I have a
> problem?
>
> Yes, that is what I was referring to. But, in your test environment,
> you might be able to work around the problem by setting
> "spark.ui.enabled=false"; that should disable all the code that uses
> Jersey, so you can use your newer version in your unit tests.
>
>
> --
> Marcelo
>