Re: TallSkinnyQR

2016-11-07 Thread Sean Owen
Rather than post a large section of code, please post a small example of
the input matrix and its decomposition, to illustrate what you're saying is
out of order.

On Tue, Nov 8, 2016 at 3:50 AM im281  wrote:

> I am getting the correct rows but they are out of order. Is this a bug or
> am
> I doing something wrong?
>
>
>


Why active tasks is bigger than cores?

2016-11-07 Thread 涂小刚
Hi, all,

I run a spark-streaming application, but the ui showed that the active
tasks was bigger than cores.

According to my knowledge of spark, one task occupys one core when
"spark.task.cpus" is set 1. Some places I understand wrong?


​


Re: spark streaming with kinesis

2016-11-07 Thread Takeshi Yamamuro
I'm not familiar with the kafka implementation though, a kinesis receiver
runs in a thread of executors.
You can set any value in the interval, but frequent checkpoints cause
excess loads in dynamodb.
See:
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html#kinesis-checkpointing

// maropu

On Mon, Nov 7, 2016 at 1:36 PM, Shushant Arora 
wrote:

> Hi
>
> By receicer I meant spark streaming receiver architecture- means worker
> nodes are different than receiver nodes. There is no direct consumer/low
> level consumer like of  Kafka in kinesis spark streaming?
>
> Is there any limitation on interval checkpoint - minimum of 1second in
> spark streaming with kinesis. But as such there is no limit on checkpoint
> interval in KCL side ?
>
> Thanks
>
> On Tue, Oct 25, 2016 at 8:36 AM, Takeshi Yamamuro 
> wrote:
>
>> I'm not exactly sure about the receiver you pointed though,
>> if you point the "KinesisReceiver" implementation, yes.
>>
>> Also, we currently cannot disable the interval checkpoints.
>>
>> On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks!
>>>
>>> Is kinesis streams are receiver based only? Is there non receiver based
>>> consumer for Kinesis ?
>>>
>>> And Instead of having fixed checkpoint interval,Can I disable auto
>>> checkpoint and say  when my worker has processed the data after last record
>>> of mapPartition now checkpoint the sequence no using some api.
>>>
>>>
>>>
>>> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro >> > wrote:
>>>
 Hi,

 The only thing you can do for Kinesis checkpoints is tune the interval
 of them.
 https://github.com/apache/spark/blob/master/external/kinesis
 -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
 isUtils.scala#L68

 Whether the dataloss occurs or not depends on the storage level you set;
 if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue
 processing
 in case of the dataloss because the stream data Spark receives are
 replicated across executors.
 However,  all the executors that have the replicated data crash,
 IIUC the dataloss occurs.

 // maropu

 On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Does spark streaming consumer for kinesis uses Kinesis Client Library
>  and mandates to checkpoint the sequence number of shards in dynamo db.
>
> Will it lead to dataloss if consumed datarecords are not yet processed
> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
> spark worker crashes - then spark launched the worker on another node but
> start consuming from dynamo db's checkpointed sequence number which is
> ahead of processed sequenece number .
>
> is there a way to checkpoint the sequenece numbers ourselves in
> Kinesis as it is in Kafka low level consumer ?
>
> Thanks
>
>


 --
 ---
 Takeshi Yamamuro

>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


[ANNOUNCE] Announcing Apache Spark 1.6.3

2016-11-07 Thread Reynold Xin
We are happy to announce the availability of Spark 1.6.3! This maintenance
release includes fixes across several areas of Spark and encourage users on
the 1.6.x line to upgrade to 1.6.3.

Head to the project's download page to download the new version:
http://spark.apache.org/downloads.html


TallSkinnyQR

2016-11-07 Thread im281
I am getting the correct rows but they are out of order. Is this a bug or am
I doing something wrong?




public class CoordinateMatrixDemo {

public static void main(String[] args) {

//boiler plate needed to run locally  
SparkConf conf = new SparkConf().setAppName("Simple
Application").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

SparkSession spark = SparkSession
.builder()
.appName("CoordinateMatrix")
.getOrCreate()
.newSession();

run(sc,"Data/sparsematrix.txt");
}


private static void run(JavaSparkContext sc, String file) {

//Read coordinate matrix from text or database
JavaRDD fileA = sc.textFile(file);

//map text file with coordinate data (sparse matrix) to
JavaRDD
JavaRDD matrixA = fileA.map(new Function() {
public MatrixEntry call(String x){
String[] indeceValue = x.split(",");
long i = Long.parseLong(indeceValue[0]);
long j = Long.parseLong(indeceValue[1]);
double value = 
Double.parseDouble(indeceValue[2]);
return new MatrixEntry(i, j, value );
}
});

//coordinate matrix from sparse data
CoordinateMatrix cooMatrixA = new 
CoordinateMatrix(matrixA.rdd());

//create block matrix
BlockMatrix matA = cooMatrixA.toBlockMatrix();

//create block matrix after matrix multiplication (square 
matrix)
BlockMatrix ata = matA.transpose().multiply(matA);

//print out the original dense matrix
System.out.println(matA.toLocalMatrix().toString());

//print out the transpose of the dense matrix
System.out.println(matA.transpose().toLocalMatrix().toString());

//print out the square matrix (after multiplication)
System.out.println(ata.toLocalMatrix().toString());

JavaRDD entries =
ata.toCoordinateMatrix().entries().toJavaRDD();



//QR decomposition DEMO
// Convert it to an IndexRowMatrix whose rows are sparse 
vectors.
IndexedRowMatrix indexedRowMatrix = 
cooMatrixA.toIndexedRowMatrix();

// Drop its row indices.
RowMatrix rowMat = indexedRowMatrix.toRowMatrix();

// QR decomposition 
*QRDecomposition result = 
rowMat.tallSkinnyQR(true);*

*System.out.println("Q: " + result.Q().toBreeze().toString());*
System.out.println("R: " + result.R().toString());

   
Vector[] collectPartitions = (Vector[]) 
result.Q().rows().collect();

System.out.println("Q factor is:");
for (Vector vector : collectPartitions) {
  System.out.println("\t" + vector);
}






//compute Qt
//need to compute d = Qt*b where b is the experimental
//Then solve for d using Gaussian elimination

//Extract Q values and create matrix
//TODO:! The array will be HUGE
String Qm = result.Q().toBreeze().toString();
String[] Qmatrix = Qm.split("\\s+");

int rows = (int)result.Q().numRows();
int cols = (int)result.Q().numCols();

try {
PrintWriter pw = new PrintWriter("Data/qMatrix.txt");
pw.write(Qm);
pw.close();

PrintWriter pw1 = new PrintWriter("Data/qMatrix1.txt");
//write coordinate matrix to file
int k = 0;
for(int i = 0; i < (int)result.Q().numRows();i++){
for(int j = 0; j < 
(int)result.Q().numCols();j++){
pw1.println(i + "," + j + "," + 
Qmatrix[k]);
k++;
}
}
pw1.close();

} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

//Read coordinate matrix from text or database
JavaRDD fileQ = 

Re: Structured Streaming with Cassandra, Is it supported??

2016-11-07 Thread Tathagata Das
Spark 2.0 supports writing out to files, as well as you can do custom
foreach code. We havent yet officially released Sink API for custom
connector to be implemented, but hopefully we will be able to do it soon.

That said, I will not rule out possibility of connectors written using
internal, non-public APIs that may be floating out there. Others can chime
in on that.



On Mon, Nov 7, 2016 at 3:57 PM, shyla deshpande 
wrote:

> I am using spark-cassandra-connector_2.11.
>
> On Mon, Nov 7, 2016 at 3:33 PM, shyla deshpande 
> wrote:
>
>> Hi ,
>>
>> I am trying to do structured streaming with the wonderful SparkSession,
>> but cannot save the streaming data to Cassandra.
>>
>> If anyone has got this working, please help
>>
>> Thanks
>>
>>
>


Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-07 Thread Tathagata Das
For WAL in Spark to work with HDFS, the HDFS version you are running must
support file appends. Contact your HDFS package/installation provider to
figure out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit  wrote:

> Hello All,
>
>
> We are using Spark 1.6.2 with WAL enabled and encountering data loss when
> the following exception/warning happens. We are using HDFS as our
> checkpoint directory.
>
>
> Questions are:
>
>
> 1. Is this a bug in Spark or issue with our configuration? Source looks
> like the following. Which file already exist or who is suppose to set
> hdfs.append.support configuration? Why doesn't it happen all the time?
>
>
> private[streaming] object HdfsUtils {
>
>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream 
> = {
> val dfsPath = new Path(path)
> val dfs = getFileSystemForPath(dfsPath, conf)
> // If the file exists and we have append support, append instead of 
> creating a new file
> val stream: FSDataOutputStream = {
>   if (dfs.isFile(dfsPath)) {
> if (conf.getBoolean("hdfs.append.support", false) || 
> dfs.isInstanceOf[RawLocalFileSystem]) {
>   dfs.append(dfsPath)
> } else {
>   throw new IllegalStateException("File exists and there is no append 
> support!")
> }
>   } else {
> dfs.create(dfsPath)
>   }
> }
> stream
>   }
>
>
> 2. Why does the job not retry and eventually fail when this error occurs?
> The job skips processing the exact number of events dumped in the log. For
> this particular example I see 987 + 4686 events were not processed and are
> lost for ever (does not recover even on restart).
>
>
> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write
> to write ahead log after 3 failures
> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$
> DefaultPromise@5ce88cb6), Record(
> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
> java.lang.IllegalStateException: File exists and there is no append
> support!
> at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$
> apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
> BatchedWriteAheadLog.scala:173)
> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$
> anon$1.run(BatchedWriteAheadLog.scala:140)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,
> WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),
> FileBasedWriteAheadLogSegment(hdfs://
> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-
> 1478553818621-1478553878621,0,41597 to the WriteAheadLog.
> java.lang.IllegalStateException: File exists and there is no append
> support!
> at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
> at 

Re: Access_Remote_Kerberized_Cluster_Through_Spark

2016-11-07 Thread Ajay Chander
Did anyone use
https://www.codatlas.com/github.com/apache/spark/HEAD/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
to interact with secured Hadoop from Spark ?

Thanks,
Ajay

On Mon, Nov 7, 2016 at 4:37 PM, Ajay Chander  wrote:

>
> Hi Everyone,
>
> I am trying to develop a simple codebase on my machine to read data from
> secured Hadoop cluster. We have a development cluster which is secured
> through Kerberos and I want to run a Spark job from my IntelliJ to read
> some sample data from the cluster. Has anyone done this before ? Can you
> point me to some sample examples?
>
> I understand that, if we want to talk to secured cluster, we need to have
> keytab and principle. I tried using it through 
> UserGroupInformation.loginUserFromKeytab
> and SparkHadoopUtil.get.loginUserFromKeytab but so far no luck.
>
> I have been trying to do this from quite a while ago. Please let me know
> if you need more info. Thanks
>
> Regards,
> Ajay
>


Re: Structured Streaming with Cassandra, Is it supported??

2016-11-07 Thread shyla deshpande
I am using spark-cassandra-connector_2.11.

On Mon, Nov 7, 2016 at 3:33 PM, shyla deshpande 
wrote:

> Hi ,
>
> I am trying to do structured streaming with the wonderful SparkSession,
> but cannot save the streaming data to Cassandra.
>
> If anyone has got this working, please help
>
> Thanks
>
>


VectorUDT and ml.Vector for SVD

2016-11-07 Thread ganeshkrishnan
I am trying to run a SVD on a dataframe and I have used ml TF-IDF which has
created a dataframe.
Now for Singular Value Decomposition I am trying to use RowMatrix which
takes in RDD with mllib.Vector so I have to convert this Dataframe with what
I assumed was ml.Vector

However the conversion

val convertedTermDocMatrix =
MLUtils.convertMatrixColumnsFromML(termDocMatrix,"features")

fails with

java.lang.IllegalArgumentException: requirement failed: Column features must
be new Matrix type to be converted to old type but got
org.apache.spark.ml.linalg.VectorUDT


So the question is: How do I perform SVD on a DataFrame? I assume all the
functionalities of mllib has not be ported to ml.


I tried to convert my entire project to use RDD but computeSVD on RowMatrix
is throwing up out of Memory errors and anyway I would like to stick with
DataFrame.

Our text corpus is around 55 Gb of text data.



Ganesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/VectorUDT-and-ml-Vector-for-SVD-tp28038.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Correct SparkLauncher usage

2016-11-07 Thread Marcelo Vanzin
Then you need to look at your logs to figure out why the child app is not
working. "startApplication" will by default redirect the child's output to
the parent's logs.

On Mon, Nov 7, 2016 at 3:42 PM, Mohammad Tariq  wrote:

> Hi Marcelo,
>
> Thank you for the prompt response. I tried adding listeners as well,
> didn't work either. Looks like it isn't starting the job at all.
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Tue, Nov 8, 2016 at 5:06 AM, Marcelo Vanzin 
> wrote:
>
>> On Mon, Nov 7, 2016 at 3:29 PM, Mohammad Tariq 
>> wrote:
>> > I have been trying to use SparkLauncher.startApplication() to launch a
>> Spark app from within java code, but unable to do so. However, same piece
>> of code is working if I use SparkLauncher.launch().
>> >
>> > Here are the corresponding code snippets :
>> >
>> > SparkAppHandle handle = new SparkLauncher()
>> >
>> > .setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/UNPACKED/
>> spark-1.6.1-bin-hadoop2.6")
>> >
>> > .setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92
>> .jdk/Contents/Home")
>> >
>> > .setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.
>> myorg.WC").setMaster("local")
>> >
>> > .setConf("spark.dynamicAllocation.enabled",
>> "true").startApplication();System.out.println(handle.getAppId());
>> >
>> > System.out.println(handle.getState());
>> >
>> > This prints null and UNKNOWN as output.
>>
>> The information you're printing is not available immediately after you
>> call "startApplication()". The Spark app is still starting, so it may
>> take some time for the app ID and other info to be reported back. The
>> "startApplication()" method allows you to provide listeners you can
>> use to know when that information is available.
>>
>> --
>> Marcelo
>>
>
>


-- 
Marcelo


Re: Correct SparkLauncher usage

2016-11-07 Thread Mohammad Tariq
Hi Marcelo,

Thank you for the prompt response. I tried adding listeners as well, didn't
work either. Looks like it isn't starting the job at all.


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Tue, Nov 8, 2016 at 5:06 AM, Marcelo Vanzin  wrote:

> On Mon, Nov 7, 2016 at 3:29 PM, Mohammad Tariq  wrote:
> > I have been trying to use SparkLauncher.startApplication() to launch a
> Spark app from within java code, but unable to do so. However, same piece
> of code is working if I use SparkLauncher.launch().
> >
> > Here are the corresponding code snippets :
> >
> > SparkAppHandle handle = new SparkLauncher()
> >
> > .setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/
> UNPACKED/spark-1.6.1-bin-hadoop2.6")
> >
> > .setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92
> .jdk/Contents/Home")
> >
> > .setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.
> myorg.WC").setMaster("local")
> >
> > .setConf("spark.dynamicAllocation.enabled",
> "true").startApplication();System.out.println(handle.getAppId());
> >
> > System.out.println(handle.getState());
> >
> > This prints null and UNKNOWN as output.
>
> The information you're printing is not available immediately after you
> call "startApplication()". The Spark app is still starting, so it may
> take some time for the app ID and other info to be reported back. The
> "startApplication()" method allows you to provide listeners you can
> use to know when that information is available.
>
> --
> Marcelo
>


Re: Correct SparkLauncher usage

2016-11-07 Thread Marcelo Vanzin
On Mon, Nov 7, 2016 at 3:29 PM, Mohammad Tariq  wrote:
> I have been trying to use SparkLauncher.startApplication() to launch a Spark 
> app from within java code, but unable to do so. However, same piece of code 
> is working if I use SparkLauncher.launch().
>
> Here are the corresponding code snippets :
>
> SparkAppHandle handle = new SparkLauncher()
>
> 
> .setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/UNPACKED/spark-1.6.1-bin-hadoop2.6")
>
> 
> .setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home")
>
> 
> .setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.myorg.WC").setMaster("local")
>
> .setConf("spark.dynamicAllocation.enabled", 
> "true").startApplication();System.out.println(handle.getAppId());
>
> System.out.println(handle.getState());
>
> This prints null and UNKNOWN as output.

The information you're printing is not available immediately after you
call "startApplication()". The Spark app is still starting, so it may
take some time for the app ID and other info to be reported back. The
"startApplication()" method allows you to provide listeners you can
use to know when that information is available.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Structured Streaming with Cassandra, Is it supported??

2016-11-07 Thread shyla deshpande
Hi ,

I am trying to do structured streaming with the wonderful SparkSession, but
cannot save the streaming data to Cassandra.

If anyone has got this working, please help

Thanks


Correct SparkLauncher usage

2016-11-07 Thread Mohammad Tariq
Dear fellow Spark users,

I have been trying to use *SparkLauncher.startApplication()* to launch a
Spark app from within java code, but unable to do so. However, same piece
of code is working if I use *SparkLauncher.launch()*.

Here are the corresponding code snippets :

*SparkAppHandle handle = new SparkLauncher()*

*
.setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/UNPACKED/spark-1.6.1-bin-hadoop2.6")*

*
.setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home")*

*
.setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.myorg.WC").setMaster("local")*

*.setConf("spark.dynamicAllocation.enabled",
"true").startApplication();System.out.println(handle.getAppId());*

*System.out.println(handle.getState());*

This prints *null* and *UNKNOWN *as output.

*Process spark = new SparkLauncher()*

*
.setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/UNPACKED/spark-1.6.1-bin-hadoop2.6")*

*
.setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home")*

*
.setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.myorg.WC").setMaster("local").launch();*

*spark.waitFor();*

*InputStreamReaderRunnable inputStreamReaderRunnable = new
InputStreamReaderRunnable(spark.getInputStream(),*

*"input");*

*Thread inputThread = new Thread(inputStreamReaderRunnable,
"LogStreamReader input");*

*inputThread.start();*


*InputStreamReaderRunnable errorStreamReaderRunnable = new
InputStreamReaderRunnable(spark.getErrorStream(),*

*"error");*

*Thread errorThread = new Thread(errorStreamReaderRunnable,
"LogStreamReader error");*

*errorThread.start();*


*System.out.println("Waiting for finish...");*

*int exitCode = spark.waitFor();*

*System.out.println("Finished! Exit code:" + exitCode);*

While this works perfectly fine.

Any pointers would be really helpful.


Thank you!


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]




[image: --]

Tariq, Mohammad
[image: https://]about.me/mti



RE: Anomalous Spark RDD persistence behavior

2016-11-07 Thread Shreya Agarwal
I don’t think this is correct. Unless you are serializing when caching to 
memory but not serializing when persisting to disk. Can you check?

Also, I have seen the behavior where if I have 100 GB in-memory cache and I use 
60 GB to persist something (MEMORY_AND_DISK). Then try to persist another RDD 
with MEMORY_AND_DISK option which is much greater than the remaining 40 GB 
(lets say 1 TB), my executors start getting killed at one point. During this 
period, the memory usage goes above 100GB and after some extra usage it fails. 
It seems like Spark is trying to cache this new RDD to memory and move the old 
one out to disk. But it is not able to move the old one out fast enough and 
crashes with OOM. Anyone seeing that?

From: Dave Jaffe [mailto:dja...@vmware.com]
Sent: Monday, November 7, 2016 2:07 PM
To: user@spark.apache.org
Subject: Anomalous Spark RDD persistence behavior

I’ve been studying Spark RDD persistence with spark-perf 
(https://github.com/databricks/spark-perf), especially when the dataset size 
starts to exceed available memory.

I’m running Spark 1.6.0 on YARN with CDH 5.7. I have 10 NodeManager nodes, each 
with 16 vcores and 32 GB of container memory. So I’m running 39 executors with 
4 cores and 8 GB each (6 GB spark.executor.memory and 2 GB 
spark.yarn.executor.memoryOverhead). I am using the default values for 
spark.memory.fraction and spark.memory.storageFraction so I end up with 3.1 GB 
available for caching RDDs, for a total of about 121 GB.

I’m running a single Random Forest test, with 500 features and up to 40 million 
examples, with 1 partition per core or 156 total partitions. The code (at line 
https://github.com/databricks/spark-perf/blob/master/mllib-tests/v1p5/src/main/scala/mllib/perf/MLAlgorithmTests.scala#L653)
 caches the input RDD immediately after creation. At 30M examples this fits 
into memory with all 156 partitions cached, with a total 113.4 GB in memory, or 
4 blocks of about 745 MB each per executor. So far so good.

At 40M examples, I expected about 3 partitions to fit in memory per executor, 
or 75% to be cached. However, I found only 3 partitions across the cluster were 
cached, or 2%, for a total size in memory of 2.9GB. Three of the executors had 
one block of 992 MB cached, with 2.1 GB free (enough for 2 more blocks). The 
other 36 held no blocks, with 3.1 GB free (enough for 3 blocks). Why this 
dramatic falloff?

Thinking this may improve if I changed the persistence to MEMORY_AND_DISK. 
Unfortunately now the executor memory was exceeded (“Container killed by YARN 
for exceeding memory limits. 8.9 GB of 8 GB physical memory used”) and the run 
ground to a halt. Why does persisting to disk take more memory than caching to 
memory?

Is this behavior expected as dataset size exceeds available memory?

Thanks in advance,

Dave Jaffe
Big Data Performance
VMware
dja...@vmware.com




Re: Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-07 Thread Cody Koeninger
I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
 if  // it's a header
parser = someParserBasedOn(line)
else
   items += parser.parse(line)
 }
 items.iterator
}

On Mon, Nov 7, 2016 at 4:22 PM, coolgar  wrote:
> I'm using apache spark streaming with the kafka direct consumer. The data
> stream I'm receiving is log data that includes a header with each block of
> messages. Each DStream can therefore have many blocks of messages, each with
> it's own header.
>
> The header is used to know how to interpret the following fields in the
> block of messages. My challenge is that I'm building up (K,V) pairs that are
> processed by reduceByKey() and I use this header to know how to parse the
> fields that follow the header into the (K,V) pairs.
>
> So each message received by kakfa may appear as follows (# denotes the
> header field, \n denotes new line):
> #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
> field6 field7\data4 data5 data6 data7\n...
>
> Is there a way, without collecting all data back to the driver, to "grab"
> the header and use it to subsequently process the messages that follow the
> header until a new #fields comes along, rinse, repeat?
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark ML - Naive Bayes - how to select Threshold values

2016-11-07 Thread Nirav Patel
Few questions about `thresholds` parameter: This is what doc says "Param
for Thresholds in multi-class classification to adjust the probability of
predicting each class. Array must have length equal to the number of
classes, with values >= 0. The class with largest value p/t is predicted,
where p is the original probability of that class and t is the class'
threshold."

0) How does threshold helps here? My general idea is if you have threshold
0.7 then at least one class prediction probability should be more then 0.7
if not then prediction should return empty. Means classify it as
'uncertain' . How can p/t function going to achieve that?

1) What probability it adjust? default column 'probability' is actually
conditional probability and 'rawPrediction'
confidence . I believe threshold will adjust 'rawPrediction' not
'probability' column. Am I right?

2) Here's how some of my probability and rawPrediction vector look like.
How do I set threshold values based on this
based on this? rawPrediction seems to be on log scale here.

Probability:
[2.233368649314982E-15,1.6429456680945863E-9,1.4377313514127723E-15,7.858651849363202E-15]

rawPrediction:
[-496.9606736723107,-483.452183395287,-497.40111830218746]

Basically I want classifier to leave Prediction column empty if it doesn't
have any probability that is more then 0.7 percent.

Is there any default threshold like 0.5 ? if so on what values it applies
cause  "Probability" and "rawPrediction" don't seem to be between 0 and 1

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Does DeserializeToObject mean that a Row is deserialized to Java objects?

2016-11-07 Thread Benyi Wang
Below is my test code using Spark 2.0.1. DeserializeToObject doesn’t exist
in filter() but in map(). Does it means map() does not Tungsten operation?

case class Event(id: Long)
val e1 = Seq(Event(1L), Event(2L)).toDSval e2 = Seq(Event(2L), Event(3L)).toDS

e1.filter(e=>e.id < 10 && e.id > 5).explain
// == Physical Plan ==// *Filter .apply// +- LocalTableScan [id#145L]

e1.map(e=>e.id < 10 && e.id > 5).explain// == Physical Plan ==//
*SerializeFromObject [input[0, boolean, true] AS value#155]// +-
*MapElements , obj#154: boolean//+-
*DeserializeToObject newInstance(class $line41.$read$$iw$$iw$Event),
obj#153: // $line41.$read$$iw$$iw$Event//   +- LocalTableScan
[id#145L]

Another question: If I register a complex function as a UDF, in what
situation, DeserializeToObject/SerialzeFromObject will happen?

Thanks.
​


Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-07 Thread coolgar
I'm using apache spark streaming with the kafka direct consumer. The data
stream I'm receiving is log data that includes a header with each block of
messages. Each DStream can therefore have many blocks of messages, each with
it's own header. 

The header is used to know how to interpret the following fields in the
block of messages. My challenge is that I'm building up (K,V) pairs that are
processed by reduceByKey() and I use this header to know how to parse the
fields that follow the header into the (K,V) pairs.
 
So each message received by kakfa may appear as follows (# denotes the
header field, \n denotes new line):
#fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
field6 field7\data4 data5 data6 data7\n...

Is there a way, without collecting all data back to the driver, to "grab"
the header and use it to subsequently process the messages that follow the
header until a new #fields comes along, rinse, repeat? 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Anomalous Spark RDD persistence behavior

2016-11-07 Thread Dave Jaffe
I’ve been studying Spark RDD persistence with spark-perf 
(https://github.com/databricks/spark-perf), especially when the dataset size 
starts to exceed available memory.

I’m running Spark 1.6.0 on YARN with CDH 5.7. I have 10 NodeManager nodes, each 
with 16 vcores and 32 GB of container memory. So I’m running 39 executors with 
4 cores and 8 GB each (6 GB spark.executor.memory and 2 GB 
spark.yarn.executor.memoryOverhead). I am using the default values for 
spark.memory.fraction and spark.memory.storageFraction so I end up with 3.1 GB 
available for caching RDDs, for a total of about 121 GB.

I’m running a single Random Forest test, with 500 features and up to 40 million 
examples, with 1 partition per core or 156 total partitions. The code (at line 
https://github.com/databricks/spark-perf/blob/master/mllib-tests/v1p5/src/main/scala/mllib/perf/MLAlgorithmTests.scala#L653)
 caches the input RDD immediately after creation. At 30M examples this fits 
into memory with all 156 partitions cached, with a total 113.4 GB in memory, or 
4 blocks of about 745 MB each per executor. So far so good.

At 40M examples, I expected about 3 partitions to fit in memory per executor, 
or 75% to be cached. However, I found only 3 partitions across the cluster were 
cached, or 2%, for a total size in memory of 2.9GB. Three of the executors had 
one block of 992 MB cached, with 2.1 GB free (enough for 2 more blocks). The 
other 36 held no blocks, with 3.1 GB free (enough for 3 blocks). Why this 
dramatic falloff?

Thinking this may improve if I changed the persistence to MEMORY_AND_DISK. 
Unfortunately now the executor memory was exceeded (“Container killed by YARN 
for exceeding memory limits. 8.9 GB of 8 GB physical memory used”) and the run 
ground to a halt. Why does persisting to disk take more memory than caching to 
memory?

Is this behavior expected as dataset size exceeds available memory?

Thanks in advance,

Dave Jaffe
Big Data Performance
VMware
dja...@vmware.com




Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-07 Thread Arijit
Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the 
following exception/warning happens. We are using HDFS as our checkpoint 
directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like 
the following. Which file already exist or who is suppose to set 
hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of 
creating a new file
val stream: FSDataOutputStream = {
  if (dfs.isFile(dfsPath)) {
if (conf.getBoolean("hdfs.append.support", false) || 
dfs.isInstanceOf[RawLocalFileSystem]) {
  dfs.append(dfsPath)
} else {
  throw new IllegalStateException("File exists and there is no append 
support!")
}
  } else {
dfs.create(dfsPath)
  }
}
stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The 
job skips processing the exact number of events dumped in the log. For this 
particular example I see 987 + 4686 events were not processed and are lost for 
ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to 
write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed 
to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), 
Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597
 to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 

Access_Remote_Kerberized_Cluster_Through_Spark

2016-11-07 Thread Ajay Chander
Hi Everyone,

I am trying to develop a simple codebase on my machine to read data from
secured Hadoop cluster. We have a development cluster which is secured
through Kerberos and I want to run a Spark job from my IntelliJ to read
some sample data from the cluster. Has anyone done this before ? Can you
point me to some sample examples?

I understand that, if we want to talk to secured cluster, we need to have
keytab and principle. I tried using it through
UserGroupInformation.loginUserFromKeytab
and SparkHadoopUtil.get.loginUserFromKeytab but so far no luck.

I have been trying to do this from quite a while ago. Please let me know if
you need more info. Thanks

Regards,
Ajay


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Cody Koeninger
There definitely is Kafka documentation indicating that you should use
a different consumer group for logically different subscribers, this
is really basic to Kafka:

http://kafka.apache.org/documentation#intro_consumers

As for your comment that "commit async after each RDD, which is not
really viable also", how is it not viable?  Again, committing offsets
to Kafka doesn't give you reliable delivery semantics unless your
downstream data store is idempotent.  If your downstream data store is
idempotent, then it shouldn't matter to you when offset commits
happen, as long as they happen within a reasonable time after the data
is written.

Do you want to keep arguing with me, or follow my advice and proceed
with debugging any remaining issues after you make the changes I
suggested?

On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy  wrote:
> With our stream version, we update the offsets for only the partition we
> operating on. We even break down the partition into smaller batches and then
> update the offsets after each batch within the partition. With Spark 1.6 and
> Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
> necessarily a Spark issue since Kafka no longer allows you to simply update
> the offsets for a given consumer group. You have to subscribe or assign
> partitions to even do so.
>
> As for storing the offsets in some other place like a DB, it don't find this
> useful because you then can't use tools like Kafka Manager. In order to do
> so you would have to store in a DB and the circle back and update Kafka
> afterwards. This means you have to keep two sources in sync which is not
> really a good idea.
>
> It is a challenge in Spark to use the Kafka offsets since the drive keeps
> subscribed to the topic(s) and consumer group, while the executors prepend
> "spark-executor-" to the consumer group. The stream (driver) does allow you
> to commit async after each RDD, which is not really viable also. I have not
> of implementing an Akka actor system on the driver and send it messages from
> the executor code to update the offsets, but then that is asynchronous as
> well so not really a good solution.
>
> I have no idea why Kafka made this change and also why in the parallel
> KafkaRDD application we would be advised to use different consumer groups
> for each RDD. That seems strange to me that different consumer groups would
> be required or advised. There is no Kafka documentation that I know if that
> states this. The biggest issue I see with the parallel KafkaRDD is the
> timeouts. I have tried to set poll.ms to 30 seconds and still get the issue.
> Something is not right here and just not seem right. As I mentioned with the
> streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
> issue. We have been running the same basic logic for over a year now without
> one hitch at all.
>
> Ivan
>
>
> On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger  wrote:
>>
>> Someone can correct me, but I'm pretty sure Spark dstreams (in
>> general, not just kafka) have been progressing on to the next batch
>> after a given batch aborts for quite some time now.  Yet another
>> reason I put offsets in my database transactionally.  My jobs throw
>> exceptions if the offset in the DB isn't what I expected it to be.
>>
>>
>>
>>
>> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben  wrote:
>> > I've been encountering the same kinds of timeout issues as Ivan, using
>> > the "Kafka Stream" approach that he is using, except I'm storing my offsets
>> > manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet
>> > implemented the KafkaRDD approach, and therefore don't have the concurrency
>> > issues, but a very similar use case is coming up for me soon, it's just 
>> > been
>> > backburnered until I can get streaming to be more reliable (I will
>> > definitely ensure unique group IDs when I do). Offset commits are certainly
>> > more painful in Kafka 0.10, and that doesn't have anything to do with 
>> > Spark.
>> >
>> > While i may be able to alleviate the timeout by just increasing it, I've
>> > noticed something else that is more worrying: When one task fails 4 times 
>> > in
>> > a row (i.e. "Failed to get records for _ after polling for _"), Spark 
>> > aborts
>> > the Stage and Job with "Job aborted due to stage failure: Task _ in stage _
>> > failed 4 times". That's fine, and it's the behavior I want, but instead of
>> > stopping the Application there (as previous versions of Spark did) the next
>> > microbatch marches on and offsets are committed ahead of the failed
>> > microbatch. Suddenly my at-least-once app becomes more
>> > sometimes-at-least-once which is no good. In order for spark to display 
>> > that
>> > failure, I must be propagating the errors up to Spark, but the behavior of
>> > marching forward with the next microbatch seems to be new, and a big
>> > potential for data loss in streaming 

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Ivan von Nagy
With our stream version, we update the offsets for only the partition we
operating on. We even break down the partition into smaller batches and
then update the offsets after each batch within the partition. With Spark
1.6 and Kafka 0.8.x this was not an issue, and as Sean pointed out, this is
not necessarily a Spark issue since Kafka no longer allows you to simply
update the offsets for a given consumer group. You have to subscribe or
assign partitions to even do so.

As for storing the offsets in some other place like a DB, it don't find
this useful because you then can't use tools like Kafka Manager. In order
to do so you would have to store in a DB and the circle back and update
Kafka afterwards. This means you have to keep two sources in sync which is
not really a good idea.

It is a challenge in Spark to use the Kafka offsets since the drive keeps
subscribed to the topic(s) and consumer group, while the executors prepend
"spark-executor-" to the consumer group. The stream (driver) does allow you
to commit async after each RDD, which is not really viable also. I have not
of implementing an Akka actor system on the driver and send it messages
from the executor code to update the offsets, but then that is asynchronous
as well so not really a good solution.

I have no idea why Kafka made this change and also why in the parallel
KafkaRDD application we would be advised to use different consumer groups
for each RDD. That seems strange to me that different consumer groups would
be required or advised. There is no Kafka documentation that I know if that
states this. The biggest issue I see with the parallel KafkaRDD is the
timeouts. I have tried to set poll.ms to 30 seconds and still get the
issue. Something is not right here and just not seem right. As I mentioned
with the streaming application, with Spark 1.6 and Kafka 0.8.x we never saw
this issue. We have been running the same basic logic for over a year now
without one hitch at all.

Ivan


On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger  wrote:

> Someone can correct me, but I'm pretty sure Spark dstreams (in
> general, not just kafka) have been progressing on to the next batch
> after a given batch aborts for quite some time now.  Yet another
> reason I put offsets in my database transactionally.  My jobs throw
> exceptions if the offset in the DB isn't what I expected it to be.
>
>
>
>
> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben  wrote:
> > I've been encountering the same kinds of timeout issues as Ivan, using
> the "Kafka Stream" approach that he is using, except I'm storing my offsets
> manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet
> implemented the KafkaRDD approach, and therefore don't have the concurrency
> issues, but a very similar use case is coming up for me soon, it's just
> been backburnered until I can get streaming to be more reliable (I will
> definitely ensure unique group IDs when I do). Offset commits are certainly
> more painful in Kafka 0.10, and that doesn't have anything to do with Spark.
> >
> > While i may be able to alleviate the timeout by just increasing it, I've
> noticed something else that is more worrying: When one task fails 4 times
> in a row (i.e. "Failed to get records for _ after polling for _"), Spark
> aborts the Stage and Job with "Job aborted due to stage failure: Task _ in
> stage _ failed 4 times". That's fine, and it's the behavior I want, but
> instead of stopping the Application there (as previous versions of Spark
> did) the next microbatch marches on and offsets are committed ahead of the
> failed microbatch. Suddenly my at-least-once app becomes more
> sometimes-at-least-once which is no good. In order for spark to display
> that failure, I must be propagating the errors up to Spark, but the
> behavior of marching forward with the next microbatch seems to be new, and
> a big potential for data loss in streaming applications.
> >
> > Am I perhaps missing a setting to stop the entire streaming application
> once spark.task.maxFailures is reached? Has anyone else seen this behavior
> of a streaming application skipping over failed microbatches?
> >
> > Thanks,
> > Sean
> >
> >
> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger  wrote:
> >>
> >> So basically what I am saying is
> >>
> >> - increase poll.ms
> >> - use a separate group id everywhere
> >> - stop committing offsets under the covers
> >>
> >> That should eliminate all of those as possible causes, and then we can
> >> see if there are still issues.
> >>
> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
> >> subscribe to a topic in order to update offsets, Kafka does.  If you
> >> don't like the new Kafka consumer api, the existing 0.8 simple
> >> consumer api should be usable with later brokers.  As long as you
> >> don't need SSL or dynamic subscriptions, and it meets your needs, keep
> >> using it.
> >>
> >> On Fri, Nov 4, 2016 at 3:37 

Re: Newbie question - Best way to bootstrap with Spark

2016-11-07 Thread Raghav
Thanks a ton, guys.

On Sun, Nov 6, 2016 at 4:57 PM, raghav  wrote:

> I am newbie in the world of big data analytics, and I want to teach myself
> Apache Spark, and want to be able to write scripts to tinker with data.
>
> I have some understanding of Map Reduce but have not had a chance to get my
> hands dirty. There are tons of resources for Spark, but I am looking for
> some guidance for starter material, or videos.
>
> Thanks.
>
> Raghav
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Newbie-question-Best-way-to-
> bootstrap-with-Spark-tp28032.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Raghav


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Cody Koeninger
Someone can correct me, but I'm pretty sure Spark dstreams (in
general, not just kafka) have been progressing on to the next batch
after a given batch aborts for quite some time now.  Yet another
reason I put offsets in my database transactionally.  My jobs throw
exceptions if the offset in the DB isn't what I expected it to be.




On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben  wrote:
> I've been encountering the same kinds of timeout issues as Ivan, using the 
> "Kafka Stream" approach that he is using, except I'm storing my offsets 
> manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet 
> implemented the KafkaRDD approach, and therefore don't have the concurrency 
> issues, but a very similar use case is coming up for me soon, it's just been 
> backburnered until I can get streaming to be more reliable (I will definitely 
> ensure unique group IDs when I do). Offset commits are certainly more painful 
> in Kafka 0.10, and that doesn't have anything to do with Spark.
>
> While i may be able to alleviate the timeout by just increasing it, I've 
> noticed something else that is more worrying: When one task fails 4 times in 
> a row (i.e. "Failed to get records for _ after polling for _"), Spark aborts 
> the Stage and Job with "Job aborted due to stage failure: Task _ in stage _ 
> failed 4 times". That's fine, and it's the behavior I want, but instead of 
> stopping the Application there (as previous versions of Spark did) the next 
> microbatch marches on and offsets are committed ahead of the failed 
> microbatch. Suddenly my at-least-once app becomes more 
> sometimes-at-least-once which is no good. In order for spark to display that 
> failure, I must be propagating the errors up to Spark, but the behavior of 
> marching forward with the next microbatch seems to be new, and a big 
> potential for data loss in streaming applications.
>
> Am I perhaps missing a setting to stop the entire streaming application once 
> spark.task.maxFailures is reached? Has anyone else seen this behavior of a 
> streaming application skipping over failed microbatches?
>
> Thanks,
> Sean
>
>
>> On Nov 4, 2016, at 2:48 PM, Cody Koeninger  wrote:
>>
>> So basically what I am saying is
>>
>> - increase poll.ms
>> - use a separate group id everywhere
>> - stop committing offsets under the covers
>>
>> That should eliminate all of those as possible causes, and then we can
>> see if there are still issues.
>>
>> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>> subscribe to a topic in order to update offsets, Kafka does.  If you
>> don't like the new Kafka consumer api, the existing 0.8 simple
>> consumer api should be usable with later brokers.  As long as you
>> don't need SSL or dynamic subscriptions, and it meets your needs, keep
>> using it.
>>
>> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy  wrote:
>>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
>>> single distinct topic. For example, the group would be something like
>>> "storage-group", and the topics would be "storage-channel1", and
>>> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
>>> partitions assigned, and then commit offsets are called after the RDD is
>>> processed. This should not interfere with the consumer group used by the
>>> executors which would be "spark-executor-storage-group".
>>>
>>> In the streaming example there is a single topic ("client-events") and group
>>> ("processing-group"). A single stream is created and offsets are manually
>>> updated from the executor after each partition is handled. This was a
>>> challenge since Spark now requires one to assign or subscribe to a topic in
>>> order to even update the offsets. In 0.8.2.x you did not have to worry about
>>> that. This approach limits your exposure to duplicate data since idempotent
>>> records are not entirely possible in our scenario. At least without a lot of
>>> re-running of logic to de-dup.
>>>
>>> Thanks,
>>>
>>> Ivan
>>>
>>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger  wrote:

 So just to be clear, the answers to my questions are

 - you are not using different group ids, you're using the same group
 id everywhere

 - you are committing offsets manually

 Right?

 If you want to eliminate network or kafka misbehavior as a source,
 tune poll.ms upwards even higher.

 You must use different group ids for different rdds or streams.
 Kafka consumers won't behave the way you expect if they are all in the
 same group id, and the consumer cache is keyed by group id. Yes, the
 executor will tack "spark-executor-" on to the beginning, but if you
 give it the same base group id, it will be the same.  And the driver
 will use the group id you gave it, unmodified.

 Finally, I really can't help you if you're manually writing your own

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Sean McKibben
I've been encountering the same kinds of timeout issues as Ivan, using the 
"Kafka Stream" approach that he is using, except I'm storing my offsets 
manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet 
implemented the KafkaRDD approach, and therefore don't have the concurrency 
issues, but a very similar use case is coming up for me soon, it's just been 
backburnered until I can get streaming to be more reliable (I will definitely 
ensure unique group IDs when I do). Offset commits are certainly more painful 
in Kafka 0.10, and that doesn't have anything to do with Spark.

While i may be able to alleviate the timeout by just increasing it, I've 
noticed something else that is more worrying: When one task fails 4 times in a 
row (i.e. "Failed to get records for _ after polling for _"), Spark aborts the 
Stage and Job with "Job aborted due to stage failure: Task _ in stage _ failed 
4 times". That's fine, and it's the behavior I want, but instead of stopping 
the Application there (as previous versions of Spark did) the next microbatch 
marches on and offsets are committed ahead of the failed microbatch. Suddenly 
my at-least-once app becomes more sometimes-at-least-once which is no good. In 
order for spark to display that failure, I must be propagating the errors up to 
Spark, but the behavior of marching forward with the next microbatch seems to 
be new, and a big potential for data loss in streaming applications.

Am I perhaps missing a setting to stop the entire streaming application once 
spark.task.maxFailures is reached? Has anyone else seen this behavior of a 
streaming application skipping over failed microbatches?

Thanks,
Sean


> On Nov 4, 2016, at 2:48 PM, Cody Koeninger  wrote:
> 
> So basically what I am saying is
> 
> - increase poll.ms
> - use a separate group id everywhere
> - stop committing offsets under the covers
> 
> That should eliminate all of those as possible causes, and then we can
> see if there are still issues.
> 
> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
> subscribe to a topic in order to update offsets, Kafka does.  If you
> don't like the new Kafka consumer api, the existing 0.8 simple
> consumer api should be usable with later brokers.  As long as you
> don't need SSL or dynamic subscriptions, and it meets your needs, keep
> using it.
> 
> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy  wrote:
>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
>> single distinct topic. For example, the group would be something like
>> "storage-group", and the topics would be "storage-channel1", and
>> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
>> partitions assigned, and then commit offsets are called after the RDD is
>> processed. This should not interfere with the consumer group used by the
>> executors which would be "spark-executor-storage-group".
>> 
>> In the streaming example there is a single topic ("client-events") and group
>> ("processing-group"). A single stream is created and offsets are manually
>> updated from the executor after each partition is handled. This was a
>> challenge since Spark now requires one to assign or subscribe to a topic in
>> order to even update the offsets. In 0.8.2.x you did not have to worry about
>> that. This approach limits your exposure to duplicate data since idempotent
>> records are not entirely possible in our scenario. At least without a lot of
>> re-running of logic to de-dup.
>> 
>> Thanks,
>> 
>> Ivan
>> 
>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger  wrote:
>>> 
>>> So just to be clear, the answers to my questions are
>>> 
>>> - you are not using different group ids, you're using the same group
>>> id everywhere
>>> 
>>> - you are committing offsets manually
>>> 
>>> Right?
>>> 
>>> If you want to eliminate network or kafka misbehavior as a source,
>>> tune poll.ms upwards even higher.
>>> 
>>> You must use different group ids for different rdds or streams.
>>> Kafka consumers won't behave the way you expect if they are all in the
>>> same group id, and the consumer cache is keyed by group id. Yes, the
>>> executor will tack "spark-executor-" on to the beginning, but if you
>>> give it the same base group id, it will be the same.  And the driver
>>> will use the group id you gave it, unmodified.
>>> 
>>> Finally, I really can't help you if you're manually writing your own
>>> code to commit offsets directly to Kafka.  Trying to minimize
>>> duplicates that way doesn't really make sense, your system must be
>>> able to handle duplicates if you're using kafka as an offsets store,
>>> it can't do transactional exactly once.
>>> 
>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy  wrote:
 Here are some examples and details of the scenarios. The KafkaRDD is the
 most
 error prone to polling
 timeouts and concurrentm modification errors.
 
 *Using 

RE: Out of memory at 60GB free memory.

2016-11-07 Thread Kürşat Kurt
Spark is not running  on mesos, runing only client mode.

 

From: Rodrick Brown [mailto:rodr...@orchardplatform.com] 
Sent: Monday, November 7, 2016 8:15 PM
To: Kürşat Kurt 
Cc: Sean Owen ; User 
Subject: Re: Out of memory at 60GB free memory.

 

You should also set memory overhead i.e. --conf 
spark.mesos.executor.memoryOverhead=${EXECUTOR_MEM} * .10

 

On Mon, Nov 7, 2016 at 6:51 AM, Kürşat Kurt  > wrote:

I understand that i shoud set the executor memory. I tried with the parameters 
below but OOM still occures...

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory 20g 
--executor-memory 20g

 

From: Sean Owen [mailto:so...@cloudera.com  ] 
Sent: Monday, November 7, 2016 12:21 PM
To: Kürşat Kurt  >; 
user@spark.apache.org  
Subject: Re: Out of memory at 60GB free memory.

 

You say "out of memory", and you allocate a huge amount of driver memory, but, 
it's your executor that's running out of memory. You want --executor-memory. 
You can't set it after the driver has run.

On Mon, Nov 7, 2016 at 5:35 AM Kürşat Kurt  > wrote:

Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at “pipeline.fit(train)” line. When i submit the code, 
everything is ok so far the stage “collect at NaiveBayes.scala:400”.

At this stage, starting 375 tasks very fast and going slowing down at this 
point. Task count could not became 500, getting OOM at 380-390th task.

 





 

-- 

  

Rodrick Brown / DevOPs

9174456839 / rodr...@orchardplatform.com  

Orchard Platform 
101 5th Avenue, 4th Floor, New York, NY

 

NOTICE TO RECIPIENTS: This communication is confidential and intended for the 
use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an offer to 
sell or a solicitation of an indication of interest to purchase any loan, 
security or any other financial product or instrument, nor is it an offer to 
sell or a solicitation of an indication of interest to purchase any products or 
services to any persons who are prohibited from receiving such information 
under applicable law. The contents of this communication may not be accurate or 
complete and are subject to change without notice. As such, Orchard App, Inc. 
(including its subsidiaries and affiliates, "Orchard") makes no representation 
regarding the accuracy or completeness of the information contained herein. The 
intended recipient is advised to consult its own professional advisors, 
including those specializing in legal, tax and accounting matters. Orchard does 
not provide legal, tax or accounting advice.



Re: SparkLauncer 2.0.1 version working incosistently in yarn-client mode

2016-11-07 Thread Marcelo Vanzin
On Sat, Nov 5, 2016 at 2:54 AM, Elkhan Dadashov  wrote:
> while (appHandle.getState() == null || !appHandle.getState().isFinal()) {
> if (appHandle.getState() != null) {
> log.info("while: Spark job state is : " + appHandle.getState());
> if (appHandle.getAppId() != null) {
> log.info("\t App id: " + appHandle.getAppId() + "\tState: " +
> appHandle.getState());
> }
> }
> }

This is a ridiculously expensive busy loop, even more so if you
comment out the log lines. Use listeners, or at least sleep a little
bit every once in a while. You're probably starving other processes /
threads of cpu.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How sensitive is Spark to Swap?

2016-11-07 Thread Sean Owen
Swapping is pretty bad here, especially because a JVM-based won't even feel
the memory pressure and try to GC or shrink the heap when the OS faces
memory pressure. It's probably relatively worse than in M/R because Spark
uses memory more. Enough grinding in swap will cause tasks to fail due to
timeouts, and because these failures are pretty correlated, will cause jobs
to die, messily. For that reason I think you always want to disable swap,
all the more so because disk I/O tends to be a bottleneck.

If you're using YARN, I do find its design encourages, kind of on purpose,
under-subscription of resources. You can probably safely over-subscribe
YARN memory, without resorting to swap.

On Mon, Nov 7, 2016 at 5:29 PM Michael Segel 
wrote:

> This may seem like a silly question, but it really isn’t.
> In terms of Map/Reduce, its possible to over subscribe the cluster because
> there is a lack of sensitivity if the servers swap memory to disk.
>
> In terms of HBase, which is very sensitive, swap doesn’t just kill
> performance, but also can kill HBase. (I’m sure one can tune it to be less
> sensitive…)
>
> But I have to ask how sensitive is Spark?
> Considering we can cache to disk (local disk) it would imply that it would
> less sensitive.
> Yet we see some posters facing over subscription and hitting OOME.
>
> Thoughts?
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: Spark Streaming backpressure weird behavior/bug

2016-11-07 Thread Michael Segel

Spark inherits its security from the underlying mechanisms in either YARN or 
MESOS (whichever environment you are launching your cluster/jobs)

That said… there is limited support from Ranger.  There are three parts to this…

1) Ranger being called when the job is launched…

2) Ranger being called when data is being read from disk (HDFS) or HBase, 
however… once the application has the data… its fair game.

Now if Ranger were woven in to a thrift server (which would be a one off ) then 
you would have more security if you were planning on providing the data to 
multiple users and applications…


Does that help?

On Nov 7, 2016, at 3:41 AM, Mudit Kumar 
> wrote:

Hi,

Do ranger provide security to spark?If yes,then in what capacity.

Thanks,
Mudit



How sensitive is Spark to Swap?

2016-11-07 Thread Michael Segel
This may seem like a silly question, but it really isn’t. 
In terms of Map/Reduce, its possible to over subscribe the cluster because 
there is a lack of sensitivity if the servers swap memory to disk. 

In terms of HBase, which is very sensitive, swap doesn’t just kill performance, 
but also can kill HBase. (I’m sure one can tune it to be less sensitive…) 

But I have to ask how sensitive is Spark? 
Considering we can cache to disk (local disk) it would imply that it would less 
sensitive. 
Yet we see some posters facing over subscription and hitting OOME. 

Thoughts? 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-07 Thread Carlo . Allocca
I found it just google 
http://sebastianraschka.com/Articles/2014_about_feature_scaling.html

Thanks.
Carlo
On 7 Nov 2016, at 17:12, carlo allocca 
> wrote:

Hi Masood,

Thank you very much for your insight.
I am going to scale all my features as you described.

As I am beginners, Is there any paper/book that would explain the suggested 
approaches? I would love to read.

Many Thanks,
Best Regards,
Carlo





On 7 Nov 2016, at 16:27, Masood Krohy 
> wrote:

Yes, you would want to scale those features before feeding into any algorithm, 
one typical way would be to calculate the average and std for each feature, 
deduct the avg, then divide by std. Dividing by "max - min" is also a good 
option if you're sure there is no outlier shooting up your max or lowering your 
min significantly for each feature. After you have scaled each feature, then 
you can feed the data into the algo for training.

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model.

It's not too complicated to implement manually, but Spark API has some support 
for this already:
ML: http://spark.apache.org/docs/latest/ml-features.html#standardscaler
MLlib: 
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#standardscaler

Masood


--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
>
A :Masood Krohy 
>
Cc :Carlo.Allocca 
>, Mohit Jaggi 
>, 
"user@spark.apache.org" 
>
Date :2016-11-07 10:50
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Masood,

thank you very much for the reply. It is very a good point as I am getting very 
bed result so far.

If I understood well what you suggest is to scale the date below (it is part of 
my dataset) before applying linear regression SGD.

is it correct?

Many Thanks in advance.

Best Regards,
Carlo



On 7 Nov 2016, at 15:31, Masood Krohy 
> wrote:

If you go down this route (look at actual coefficients/weights), then make sure 
your features are scaled first and have more or less the same mean when feeding 
them into the algo. If not, then actual coefficients/weights wouldn't tell you 
much. In any case, SGD performs badly with unscaled features, so you gain if 
you scale the features beforehand.

Masood

--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
>
A :Mohit Jaggi >
Cc :Carlo.Allocca 
>, 
"user@spark.apache.org" 
>
Date :2016-11-04 03:39
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Mohit,

Thank you for your reply.
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi 
> > wrote:
>
> For linear regression, it should be fairly easy. Just sort the co-efficients 
> :)
>
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
>
>
>
>
>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca 
>> > wrote:
>>
>> Hi All,
>>
>> I am using SPARK and in particular the MLib library.
>>
>> import org.apache.spark.mllib.regression.LabeledPoint;
>> import org.apache.spark.mllib.regression.LinearRegressionModel;
>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>>
>> For my problem I am using the LinearRegressionWithSGD and I would like to 
>> perform a “Rank Features By Importance”.
>>
>> I checked the documentation and it seems that does not provide such methods.
>>
>> Am I missing anything?  Please, could you provide any help on this?
>> Should I change the approach?
>>
>> Many Thanks in advance,
>>
>> Best Regards,
>> Carlo
>>
>>
>> -- The Open University is incorporated by Royal Charter (RC 000391), an 
>> exempt charity in England & Wales and a charity registered in Scotland (SC 
>> 038302). The Open University is authorised and regulated by the Financial 
>> Conduct Authority.

Re: Out of memory at 60GB free memory.

2016-11-07 Thread Rodrick Brown
You should also set memory overhead i.e. --conf
spark.mesos.executor.memoryOverhead=${EXECUTOR_MEM} * .10

On Mon, Nov 7, 2016 at 6:51 AM, Kürşat Kurt  wrote:

> I understand that i shoud set the executor memory. I tried with the
> parameters below but OOM still occures...
>
> ./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory
> 20g --executor-memory 20g
>
>
>
> *From:* Sean Owen [mailto:so...@cloudera.com]
> *Sent:* Monday, November 7, 2016 12:21 PM
> *To:* Kürşat Kurt ; user@spark.apache.org
> *Subject:* Re: Out of memory at 60GB free memory.
>
>
>
> You say "out of memory", and you allocate a huge amount of driver memory,
> but, it's your executor that's running out of memory. You want
> --executor-memory. You can't set it after the driver has run.
>
> On Mon, Nov 7, 2016 at 5:35 AM Kürşat Kurt  wrote:
>
> Hi;
>
> I am trying to use Naive Bayes for multi-class classification.
>
> I am getting OOM at “pipeline.fit(train)” line. When i submit the code,
> everything is ok so far the stage “collect at NaiveBayes.scala:400”.
>
> At this stage, starting 375 tasks very fast and going slowing down at this
> point. Task count could not became 500, getting OOM at 380-390th task.
>
>
>
>


-- 

[image: Orchard Platform] 

*Rodrick Brown */ *DevOPs*

9174456839 / rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.


Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-07 Thread Carlo . Allocca
Hi Masood,

Thank you very much for your insight.
I am going to scale all my features as you described.

As I am beginners, Is there any paper/book that would explain the suggested 
approaches? I would love to read.

Many Thanks,
Best Regards,
Carlo





On 7 Nov 2016, at 16:27, Masood Krohy 
> wrote:

Yes, you would want to scale those features before feeding into any algorithm, 
one typical way would be to calculate the average and std for each feature, 
deduct the avg, then divide by std. Dividing by "max - min" is also a good 
option if you're sure there is no outlier shooting up your max or lowering your 
min significantly for each feature. After you have scaled each feature, then 
you can feed the data into the algo for training.

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model.

It's not too complicated to implement manually, but Spark API has some support 
for this already:
ML: http://spark.apache.org/docs/latest/ml-features.html#standardscaler
MLlib: 
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#standardscaler

Masood


--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
>
A :Masood Krohy 
>
Cc :Carlo.Allocca 
>, Mohit Jaggi 
>, 
"user@spark.apache.org" 
>
Date :2016-11-07 10:50
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Masood,

thank you very much for the reply. It is very a good point as I am getting very 
bed result so far.

If I understood well what you suggest is to scale the date below (it is part of 
my dataset) before applying linear regression SGD.

is it correct?

Many Thanks in advance.

Best Regards,
Carlo



On 7 Nov 2016, at 15:31, Masood Krohy 
> wrote:

If you go down this route (look at actual coefficients/weights), then make sure 
your features are scaled first and have more or less the same mean when feeding 
them into the algo. If not, then actual coefficients/weights wouldn't tell you 
much. In any case, SGD performs badly with unscaled features, so you gain if 
you scale the features beforehand.

Masood

--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
>
A :Mohit Jaggi >
Cc :Carlo.Allocca 
>, 
"user@spark.apache.org" 
>
Date :2016-11-04 03:39
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Mohit,

Thank you for your reply.
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi 
> > wrote:
>
> For linear regression, it should be fairly easy. Just sort the co-efficients 
> :)
>
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
>
>
>
>
>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca 
>> > wrote:
>>
>> Hi All,
>>
>> I am using SPARK and in particular the MLib library.
>>
>> import org.apache.spark.mllib.regression.LabeledPoint;
>> import org.apache.spark.mllib.regression.LinearRegressionModel;
>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>>
>> For my problem I am using the LinearRegressionWithSGD and I would like to 
>> perform a “Rank Features By Importance”.
>>
>> I checked the documentation and it seems that does not provide such methods.
>>
>> Am I missing anything?  Please, could you provide any help on this?
>> Should I change the approach?
>>
>> Many Thanks in advance,
>>
>> Best Regards,
>> Carlo
>>
>>
>> -- The Open University is incorporated by Royal Charter (RC 000391), an 
>> exempt charity in England & Wales and a charity registered in Scotland (SC 
>> 038302). The Open University is authorised and regulated by the Financial 
>> Conduct Authority.
>>
>> -
>> To unsubscribe e-mail: 
>> user-unsubscr...@spark.apache.org
>>
>



Re: Spark with Ranger

2016-11-07 Thread Jan Hentschel
Hi Mudit,

 

As far as I know Ranger does not provide security for Spark as a repository, 
but to most of the resources Spark can have access to, such as Hive, Kafka, 
HBase or HDFS.

 

Best, Jan

 

From: Mudit Kumar 
Date: Monday, November 7, 2016 at 4:23 PM
To: "user@spark.apache.org" 
Subject: Spark with Ranger

 

Hi,

 

Do ranger provide security to spark(Spark Thrift Server/spark sql)?If yes,then 
in what capacity.

 

Thanks,

Mudit



Re: Upgrading to Spark 2.0.1 broke array in parquet DataFrame

2016-11-07 Thread Michael Armbrust
If you can reproduce the issue with Spark 2.0.2 I'd suggest opening a JIRA.

On Fri, Nov 4, 2016 at 5:11 PM, Sam Goodwin  wrote:

> I have a table with a few columns, some of which are arrays. Since
> upgrading from Spark 1.6 to Spark 2.0.1, the array fields are always null
> when reading in a DataFrame.
>
> When writing the Parquet files, the schema of the column is specified as
>
> StructField("packageIds",ArrayType(StringType))
>
> The schema of the column in the Hive Metastore is
>
> packageIds array
>
> The schema used in the writer exactly matches the schema in the Metastore
> in all ways (order, casing, types etc)
>
> The query is a simple "select *"
>
> spark.sql("select * from tablename limit 1").collect() // null columns in Row
>
> How can I begin debugging this issue? Notable things I've already
> investigated:
>
>- Files were written using Spark 1.6
>- DataFrame works in spark 1.5 and 1.6
>- I've inspected the parquet files using parquet-tools and can see the
>data.
>- I also have another table written in exactly the same way and it
>doesn't have the issue.
>
>


Re: NoSuchElementException

2016-11-07 Thread Michael Armbrust
What are you trying to do?  It looks like you are mixing multiple
SparkContexts together.

On Fri, Nov 4, 2016 at 5:15 PM, Lev Tsentsiper 
wrote:

> My code throws an exception when I am trying to create new DataSet from
> within SteamWriter sink
>
> Simplified version of the code
>
>   val df = sparkSession.readStream
> .format("json")
> .option("nullValue", " ")
> .option("headerFlag", "true")
> .option("spark.sql.shuffle.partitions", 1)
> .option("mode", "FAILFAST")
> .schema(tableSchema)
> .load(s"s3n://")
> df.writeStream
> //TODO Switch to S3 location
> //.option("checkpointLocation", s"$input/$tenant/checkpoints/")
> .option("checkpointLocation", "/tmp/checkpoins/test1")
> .foreach(new ForwachWriter() {
>
>  override def close() = {
> val sparkSession = SparkSession.builder()
>   .config(new SparkConf()
> .setAppName("zzz").set("spark.app.id", ""xxx)
> .set("spark.master", "local[1]")
>   ).getOrCreate()
>
> val data = sparkSession.createDataset(rowList).
> .createOrReplaceTempView(tempTableName)
>  val sql =   sparkSession.sql("")
> sql.repartition(1).foreachPartition(iter=> {})
>  }
>
> });
>
> This code throws an exception
>
> java.util.NoSuchElementException: key not found: 202
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196)
>
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421)
>
> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$
> readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
>
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
>
> at org.apache.spark.broadcast.TorrentBroadcast._value$
> lzycompute(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
>
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
>
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> prepareBroadcast(BroadcastHashJoinExec.scala:101)
> at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> codegenOuter(BroadcastHashJoinExec.scala:242)
> at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> doConsume(BroadcastHashJoinExec.scala:83)
> at org.apache.spark.sql.execution.CodegenSupport$class.consume(
> WholeStageCodegenExec.scala:153)
> at 
> org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
>
> at org.apache.spark.sql.execution.RowDataSourceScanExec.
> doProduce(ExistingRDD.scala:217)
> at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.CodegenSupport$class.produce(
> WholeStageCodegenExec.scala:78)
> at 
> org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
>
> at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> doProduce(BroadcastHashJoinExec.scala:77)
> at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.CodegenSupport$class.produce(
> WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
> at org.apache.spark.sql.execution.ProjectExec.doProduce(
> basicPhysicalOperators.scala:40)
> at org.apache.spark.sql.execution.CodegenSupport$$
> 

Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-07 Thread Robin East
If you have to use SGD then scaling will usually help your algorithm to 
converge quicker. If possible you should try using Linear Regression in the 
newer ml library: 
http://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression


---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 7 Nov 2016, at 15:47, Carlo.Allocca  wrote:
> 
> Hi Masood, 
> 
> thank you very much for the reply. It is very a good point as I am getting 
> very bed result so far. 
> 
> If I understood well what you suggest is to scale the date below (it is part 
> of my dataset) before applying linear regression SGD.
> 
> is it correct?
> 
> Many Thanks in advance. 
> 
> Best Regards,
> Carlo 
> 
> 
> 
>> On 7 Nov 2016, at 15:31, Masood Krohy > > wrote:
>> 
>> If you go down this route (look at actual coefficients/weights), then make 
>> sure your features are scaled first and have more or less the same mean when 
>> feeding them into the algo. If not, then actual coefficients/weights 
>> wouldn't tell you much. In any case, SGD performs badly with unscaled 
>> features, so you gain if you scale the features beforehand.
>> Masood 
>> 
>> --
>> Masood Krohy, Ph.D. 
>> Data Scientist, Intact Lab-R 
>> Intact Financial Corporation 
>> http://ca.linkedin.com/in/masoodkh  
>> 
>> 
>> 
>> De :Carlo.Allocca > > 
>> A :Mohit Jaggi > 
>> Cc :Carlo.Allocca > >, "user@spark.apache.org 
>> " > > 
>> Date :2016-11-04 03:39 
>> Objet :Re: LinearRegressionWithSGD and Rank Features By Importance 
>> 
>> 
>> 
>> Hi Mohit, 
>> 
>> Thank you for your reply. 
>> OK. it means coefficient with high score are more important that other with 
>> low score…
>> 
>> Many Thanks,
>> Best Regards,
>> Carlo
>> 
>> 
>> > On 3 Nov 2016, at 20:41, Mohit Jaggi > > > wrote:
>> > 
>> > For linear regression, it should be fairly easy. Just sort the 
>> > co-efficients :)
>> > 
>> > Mohit Jaggi
>> > Founder,
>> > Data Orchard LLC
>> > www.dataorchardllc.com 
>> > 
>> > 
>> > 
>> > 
>> >> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca > >> > wrote:
>> >> 
>> >> Hi All,
>> >> 
>> >> I am using SPARK and in particular the MLib library.
>> >> 
>> >> import org.apache.spark.mllib.regression.LabeledPoint;
>> >> import org.apache.spark.mllib.regression.LinearRegressionModel;
>> >> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>> >> 
>> >> For my problem I am using the LinearRegressionWithSGD and I would like to 
>> >> perform a “Rank Features By Importance”.
>> >> 
>> >> I checked the documentation and it seems that does not provide such 
>> >> methods.
>> >> 
>> >> Am I missing anything?  Please, could you provide any help on this?
>> >> Should I change the approach?
>> >> 
>> >> Many Thanks in advance,
>> >> 
>> >> Best Regards,
>> >> Carlo
>> >> 
>> >> 
>> >> -- The Open University is incorporated by Royal Charter (RC 000391), an 
>> >> exempt charity in England & Wales and a charity registered in Scotland 
>> >> (SC 038302). The Open University is authorised and regulated by the 
>> >> Financial Conduct Authority.
>> >> 
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> >> 
>> >> 
>> > 
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
>> 
>> 
>> 
> 



Re: expected behavior of Kafka dynamic topic subscription

2016-11-07 Thread Cody Koeninger
https://issues.apache.org/jira/browse/SPARK-18272

I couldn't speculate on what the issue might be without more info.  If
you have time to write a test for that ticket, I'd encourage you to do
so, I'm not certain how soon I'll be able to get to it.

On Sun, Nov 6, 2016 at 7:31 PM, Haopu Wang  wrote:
> Cody, thanks for the response. Do you think it's a Spark issue or Kafka 
> issue? Can you please let me know the jira ticket number?
>
> -Original Message-
> From: Cody Koeninger [mailto:c...@koeninger.org]
> Sent: 2016年11月4日 22:35
> To: Haopu Wang
> Cc: user@spark.apache.org
> Subject: Re: expected behavior of Kafka dynamic topic subscription
>
> That's not what I would expect from the underlying kafka consumer, no.
>
> But this particular case (no matching topics, then add a topic after
> SubscribePattern stream starts) actually isn't part of unit tests for
> either the DStream or the structured stream.
>
> I'll make a jira ticket.
>
> On Thu, Nov 3, 2016 at 9:43 PM, Haopu Wang  wrote:
>> I'm using Kafka010 integration API to create a DStream using
>> SubscriberPattern ConsumerStrategy.
>>
>> The specified topic doesn't exist when I start the application.
>>
>> Then I create the topic and publish some test messages. I can see them in
>> the console subscriber.
>>
>> But the spark application doesn't seem to get the messages.
>>
>> I think this is not expected, right? What should I check to resolve it?
>>
>> Thank you very much!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-07 Thread Masood Krohy
If you go down this route (look at actual coefficients/weights), then make 
sure your features are scaled first and have more or less the same mean 
when feeding them into the algo. If not, then actual coefficients/weights 
wouldn't tell you much. In any case, SGD performs badly with unscaled 
features, so you gain if you scale the features beforehand.
Masood

--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
A : Mohit Jaggi 
Cc :Carlo.Allocca , "user@spark.apache.org" 

Date :  2016-11-04 03:39
Objet : Re: LinearRegressionWithSGD and Rank Features By Importance



Hi Mohit, 

Thank you for your reply. 
OK. it means coefficient with high score are more important that other 
with low score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi  wrote:
> 
> For linear regression, it should be fairly easy. Just sort the 
co-efficients :)
> 
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
> 
> 
> 
> 
>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca  
wrote:
>> 
>> Hi All,
>> 
>> I am using SPARK and in particular the MLib library.
>> 
>> import org.apache.spark.mllib.regression.LabeledPoint;
>> import org.apache.spark.mllib.regression.LinearRegressionModel;
>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>> 
>> For my problem I am using the LinearRegressionWithSGD and I would like 
to perform a “Rank Features By Importance”.
>> 
>> I checked the documentation and it seems that does not provide such 
methods.
>> 
>> Am I missing anything?  Please, could you provide any help on this?
>> Should I change the approach?
>> 
>> Many Thanks in advance,
>> 
>> Best Regards,
>> Carlo
>> 
>> 
>> -- The Open University is incorporated by Royal Charter (RC 000391), an 
exempt charity in England & Wales and a charity registered in Scotland (SC 
038302). The Open University is authorised and regulated by the Financial 
Conduct Authority.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org






Spark with Ranger

2016-11-07 Thread Mudit Kumar
Hi,

Do ranger provide security to spark(Spark Thrift Server/spark sql)?If yes,then 
in what capacity.

Thanks,
Mudit


Spark master shows 0 cores for executors

2016-11-07 Thread Rohit Verma
Facing a strange issue with spark 2.0.1.

When creating a spark session with executor properties like

  'spark.executor.memory':'3g',\
  'spark.executor.cores':'12',\
Spark master shows 0 cores for executors.

Similar issue I found on stack overflow as
http://stackoverflow.com/questions/35169015/spark-ui-showing-0-cores-even-when-setting-cores-in-app

But couldn’t found solution. Even I am not sure which logs to look for this.

Any help is appreciated.

Regards
Rohit


VectorUDT and ml.Vector

2016-11-07 Thread Ganesh
I am trying to run a SVD on a dataframe and I have used ml TF-IDF which 
has created a dataframe.
Now for Singular Value Decomposition I am trying to use RowMatrix which 
takes in RDD with mllib.Vector so I have to convert this Dataframe with 
what I assumed was ml.Vector


However the conversion

/val convertedTermDocMatrix = 
MLUtils.convertMatrixColumnsFromML(termDocMatrix,"features")/


fails with

java.lang.IllegalArgumentException: requirement failed: Column features 
must be new Matrix type to be converted to old type but got 
org.apache.spark.ml.linalg.VectorUDT



So the question is: How do I perform SVD on a DataFrame? I assume all 
the functionalities of mllib has not be ported to ml.



I tried to convert my entire project to use RDD but computeSVD on 
RowMatrix is throwing up out of Memory errors and anyway I would like to 
stick with DataFrame.


Our text corpus is around 55 Gb of text data.



Ganesh



RE: Out of memory at 60GB free memory.

2016-11-07 Thread Kürşat Kurt
I understand that i shoud set the executor memory. I tried with the parameters 
below but OOM still occures...

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory 20g 
--executor-memory 20g

 

From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Monday, November 7, 2016 12:21 PM
To: Kürşat Kurt ; user@spark.apache.org
Subject: Re: Out of memory at 60GB free memory.

 

You say "out of memory", and you allocate a huge amount of driver memory, but, 
it's your executor that's running out of memory. You want --executor-memory. 
You can't set it after the driver has run.

On Mon, Nov 7, 2016 at 5:35 AM Kürşat Kurt  > wrote:

Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at “pipeline.fit(train)” line. When i submit the code, 
everything is ok so far the stage “collect at NaiveBayes.scala:400”.

At this stage, starting 375 tasks very fast and going slowing down at this 
point. Task count could not became 500, getting OOM at 380-390th task.

 



Spark Streaming backpressure weird behavior/bug

2016-11-07 Thread Mudit Kumar
Hi,

Do ranger provide security to spark?If yes,then in what capacity.

Thanks,
Mudit


mapWithState with a big initial RDD gets OOM'ed

2016-11-07 Thread Daniel Haviv
Hi,
I have a stateful streaming app where I pass a rather large initialState
RDD at the beginning.
No matter to how many partitions I divide the stateful stream I keep
failing on OOM or Java heap space.

Is there a way to make it more resilient?
how can I control it's storage level?

This is basically my code:

val x = ss.sql("select * From myTable where partition_ts >=
2016110600").toJSON.rdd.mapPartitions(extractIidfromJson)
val stateSpec = StateSpec.function(trackStateFunc _).numPartitions(128)
  .timeout(Durations.minutes(60 * 48)).initialState(x)
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = ss.sqlContext


val stateStream = kafkaStream.mapPartitions(r =>
jsonToJsonNode(r)).mapWithState(stateSpec)

stateStream.foreachRDD(r=>{  if (!r.isEmpty())
{ss.read.json(r).write.format("orc").mode(SaveMode.Append).saveAsTable("joinedData")}}
)


Thank you,

Daniel


spark optimization

2016-11-07 Thread maitraythaker
Why those two stages in apache spark are computing same thing?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-optimization-tp28034.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.