RE: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Mendelson, Assaf
I am using v2.4.0-RC2

The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How 
are you calling it?

When I do:
Val df = spark.read.format(mypackage).load().show()
I am getting a single creation, how are you creating the reader?

Thanks,
Assaf

From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
Sent: Tuesday, October 9, 2018 2:02 PM
To: Mendelson, Assaf; user@spark.apache.org
Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader 
and hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
Thanks Assaf, you tried with tags/v2.4.0-rc2?

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, 
SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options) {
return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional createWriter(String jobId, StructType 
schema,
  SaveMode mode, DataSourceOptions options) {
// creates a dataSourcewriter here..
return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader, 
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: 
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a 
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + 
" schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + 
this.schema);
return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf 
mailto:assaf.mendel...@rsa.com>> wrote:
Could you add a fuller code example? I tried to reproduce it in my environment 
and I am getting just one instance of the reader…

Thanks,
Assaf

From: Shubham Chaurasia 
[mailto:shubh.chaura...@gmail.com<mailto:shubh.chaura...@gmail.com>]
Sent: Tuesday, October 9, 2018 9:31 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and 
hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
Hi All,

--Spark built with tags/v2.4.0-rc2

Consider following DataSourceReader implementation:


public class MyDataSourceReader implements DataSourceReader, 
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: 
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a 
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + 
" schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + 
this.schema);
return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
class variable schema.

2) Now when planBatchInputPartitions() is called, it is being called on a 
different instance of MyDataSourceReader and hence I am not getting the value 
of schema in method planBatchInputPartitions().



How can I get value of schema which was set in readSchema() method, in 
planBatchInputPartitions() method?



Console Logs:



scala> mysource.executeQuery("select * from movie").show



MyDataSourceReader.MyDataSourceReader: 
InstantiatedMyDataSourceReader@59ea8f1b<mailto:InstantiatedMyDataSourceReader@59ea8f1b>

MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
StructType(StructField(col1,IntegerType,true), 
StructField(col2,StringType,true))

MyDataSourceReader.MyDataSourceReader: 
InstantiatedMyDataSourceReader@a3cd3ff<mailto:InstantiatedMyDataSourceReader@a3cd3ff>

MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: 
null

Thanks,
Shubham




RE: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Mendelson, Assaf
Could you add a fuller code example? I tried to reproduce it in my environment 
and I am getting just one instance of the reader…

Thanks,
Assaf

From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
Sent: Tuesday, October 9, 2018 9:31 AM
To: user@spark.apache.org
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and 
hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
Hi All,

--Spark built with tags/v2.4.0-rc2

Consider following DataSourceReader implementation:


public class MyDataSourceReader implements DataSourceReader, 
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: 
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a 
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + 
" schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + 
this.schema);
return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
class variable schema.

2) Now when planBatchInputPartitions() is called, it is being called on a 
different instance of MyDataSourceReader and hence I am not getting the value 
of schema in method planBatchInputPartitions().



How can I get value of schema which was set in readSchema() method, in 
planBatchInputPartitions() method?



Console Logs:



scala> mysource.executeQuery("select * from movie").show



MyDataSourceReader.MyDataSourceReader: 
InstantiatedMyDataSourceReader@59ea8f1b

MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
StructType(StructField(col1,IntegerType,true), 
StructField(col2,StringType,true))

MyDataSourceReader.MyDataSourceReader: 
InstantiatedMyDataSourceReader@a3cd3ff

MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: 
null

Thanks,
Shubham




RE: Spark Web UI SSL Encryption

2017-08-21 Thread Mendelson, Assaf
The following is based on stuff I did a while ago so I might be missing some 
parts.

First you need to create a certificate. The following example creates a 
self-signed one:

openssl genrsa -aes128 -out sparkssl.key 2048 -alias "standalone"
openssl rsa -in sparkssl.key -pubout -out sparkssl_public.key
openssl req -new -key sparkssl.key -out sparkssl.csr -alias "standalone"
openssl req -text -in sparkssl.csr –noout
openssl x509 -req -days 365 -in sparkssl.csr -signkey sparkssl.key -out 
sparkssl.crt -alias "standalone"

Next you need to create a keystore and truststore:

keytool -keystore clientkeystore -genkey -alias standalone
keytool -import -file sparkssl.crt -alias smaster2 -keystore clientTrustStore

Now you can add SSL properties to your conf/spark-defaults.conf:
spark.ssl.enabled truespark.ssl.enabledAlgorithms 
TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_2$
spark.ssl.keyPassword password
spark.ssl.keyStore /home/hduser/ssl/clientkeystore
spark.ssl.keyStorePassword password
spark.ssl.keyStoreType JKS
spark.ssl.protocol TLS
spark.ssl.trustStore /home/hduser/ssl/clientTrustStore
spark.ssl.trustStorePassword admin123
spark.ssl.trustStoreType JKS
spark.ui.https.enabled true

Hopefully, I didn’t miss anything

Thanks,
Assaf

From: Saisai Shao [mailto:sai.sai.s...@gmail.com]
Sent: Monday, August 21, 2017 5:28 PM
To: Anshuman Kumar 
Cc: spark users 
Subject: Re: Spark Web UI SSL Encryption

Can you please post the specific problem you met?

Thanks
Jerry

On Sat, Aug 19, 2017 at 1:49 AM, Anshuman Kumar 
> wrote:
Hello,

I have recently installed Sparks 2.2.0, and trying to use it for some big data 
processing. Spark is installed on a server that I access from a remote 
computer. I need to setup SSL encryption for the Spark web UI, but following 
some threads online I’m still not able to set it up.

Can someone help me with the SSL encryption.

Warm Regards.
-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org



RE: mapPartitioningWithIndex in Dataframe

2017-08-05 Thread Mendelson, Assaf
First I believe you mean on the Dataset API rather than the dataframe API.
You can easily add the partition index as a new column to your dataframe using 
spark_partition_id()
Then a normal mapPartitions should work fine (i.e. you should create the 
appropriate case class which includes the partition id and then do 
mapPartitions).

Thanks,
  Assaf.

From: Lalwani, Jayesh [mailto:jayesh.lalw...@capitalone.com]
Sent: Thursday, August 03, 2017 5:20 PM
To: user@spark.apache.org
Subject: mapPartitioningWithIndex in Dataframe

Are there any plans to add mapPartitioningWithIndex in the Dataframe API? Or is 
there any way to implement my own mapPartitionWithIndex for a Dataframe?

I am implementing something which is logically similar to the randomSplit 
function. In 2.1, randomSplit internally does df.mapPartitionWithIndex and 
assigns a different seed for every partition by adding the partition’s index to 
the seed. I want to get  a partition specific seed too.

The problem is rdd.mapPartitionWithIndex doesn’t work in streaming. 
df.mapPartition works, but I don’t get index.

Is there a way to extend Spark to add mapPartitionWithIndex at the Dataframe 
level ?
I was digging into the 2.2 code a bit and it looks like in 2.2, all the 
Dataframe apis have been changed to be based around SparkStrategy. I couldn’t 
figure out  how I can add my own custom strategy. Is there any documentation 
around this? If it makes sense to add this to Spark, I would be excited to make 
a contribution.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: underlying checkpoint

2017-07-16 Thread Mendelson, Assaf
Actually, show is an action.
The issue is that unless you have some aggregations, show will only go over 
some of the dataframe, not all of it and therefore the caching won’t occur 
(similar to what happens with cache).
You need an action which requires to go over the entire dataframe (which count 
does).

Thanks,
  Assaf.

From: Bernard Jesop [mailto:bernard.je...@gmail.com]
Sent: Thursday, July 13, 2017 6:58 PM
To: Vadim Semenov
Cc: user
Subject: Re: underlying checkpoint

Thank you, one of my mistakes was to think that show() was an action.

2017-07-13 17:52 GMT+02:00 Vadim Semenov 
>:
You need to trigger an action on that rdd to checkpoint it.

```
scala>spark.sparkContext.setCheckpointDir(".")

scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), 
("R", 15), ("Java", 20)))
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.rdd.checkpoint()

scala> df.rdd.isCheckpointed
res2: Boolean = false

scala> df.show()
+--+---+
|_1| _2|
+--+---+
| Scala| 35|
|Python| 30|
| R| 15|
|  Java| 20|
+--+---+


scala> df.rdd.isCheckpointed
res4: Boolean = false

scala> df.rdd.count()
res5: Long = 4

scala> df.rdd.isCheckpointed
res6: Boolean = true
```

On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
> wrote:
Hi everyone, I just tried this simple program :

 import org.apache.spark.sql.SparkSession

 object CheckpointTest extends App {

   val spark = SparkSession
 .builder()
 .appName("Toto")
 .getOrCreate()

   spark.sparkContext.setCheckpointDir(".")

   val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 
15), ("Java", 20)))

   df.show()
   df.rdd.checkpoint()
   println(if (df.rdd.isCheckpointed) "checkpointed" else "not checkpointed")
 }

But the result is still "not checkpointed".
Do you have any idea why? (knowing that the checkpoint file is created)
Best regards,
Bernard JESOP




RE: Timeline for stable release for Spark Structured Streaming

2017-07-10 Thread Mendelson, Assaf
Any day now.
One of the major milestones of spark 2.2 is making structured streaming a 
stable feature.
Spark 2.2 has passed RC6 a couple of days ago so it should be out any day now.
Note that people have been using spark 2.1 for production in structured 
streaming so you should be able to start playing around with it now (most of 
the capabilities are already there and stable).

Thanks,
  Assaf.

From: Dhrubajyoti Hati [mailto:dhruba.w...@gmail.com]
Sent: Monday, July 10, 2017 1:33 PM
To: user@spark.apache.org
Subject: Timeline for stable release for Spark Structured Streaming

Hi,

I was checking the documentation of Structured Streaming Programming 
Guide
 and it seems its still in alpha mode. Any timeline when this module will be 
ready to use for production environments.

Regards,

​Dhrubajyoti Hati
LinkedIn



RE: Merging multiple Pandas dataframes

2017-06-21 Thread Mendelson, Assaf
If you do an action, most intermediate calculations would be gone for the next 
iteration.
What I would do is persist every iteration, then after some (say 5) I would 
write to disk and reload. At that point you should call unpersist to free the 
memory as it is no longer relevant.

Thanks,
  Assaf.

From: Saatvik Shah [mailto:saatvikshah1...@gmail.com]
Sent: Tuesday, June 20, 2017 8:50 PM
To: Mendelson, Assaf
Cc: user@spark.apache.org
Subject: Re: Merging multiple Pandas dataframes

Hi Assaf,
Thanks for the suggestion on checkpointing - I'll need to read up more on that.
My current implementation seems to be crashing with a GC memory limit exceeded 
error if Im keeping multiple persist calls for a large number of files.

Thus, I was also thinking about the constant calls to persist. Since all my 
actions are Spark transformations(union of large number of Spark Dataframes 
from Pandas dataframes), this entire process of building a large Spark 
dataframe is essentially a huge transformation. Is it necessary to call persist 
between unions? Shouldnt I instead wait for all the unions to complete and call 
persist finally?


On Tue, Jun 20, 2017 at 2:52 AM, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Note that depending on the number of iterations, the query plan for the 
dataframe can become long and this can cause slowdowns (or even crashes).
A possible solution would be to checkpoint (or simply save and reload the 
dataframe) every once in a while. When reloading from disk, the newly loaded 
dataframe's lineage is just the disk...

Thanks,
  Assaf.

-Original Message-
From: saatvikshah1994 
[mailto:saatvikshah1...@gmail.com<mailto:saatvikshah1...@gmail.com>]
Sent: Tuesday, June 20, 2017 2:22 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Merging multiple Pandas dataframes

Hi,

I am iteratively receiving a file which can only be opened as a Pandas 
dataframe. For the first such file I receive, I am converting this to a Spark 
dataframe using the 'createDataframe' utility function. The next file onward, I 
am converting it and union'ing it into the first Spark dataframe(the schema 
always stays the same). After each union, I am persisting it in 
memory(MEMORY_AND_DISK_ONLY level). After I have converted all such files to a 
single spark dataframe I am coalescing it. Following some tips from this Stack 
Overflow
post(https://stackoverflow.com/questions/39381183/managing-spark-partitions-after-dataframe-unions).

Any suggestions for optimizing this process further?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Merging-multiple-Pandas-dataframes-tp28770.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>



--
Saatvik Shah,
1st  Year,
Masters in the School of Computer Science,
Carnegie Mellon University
https://saatvikshah1994.github.io/


RE: Merging multiple Pandas dataframes

2017-06-20 Thread Mendelson, Assaf
Note that depending on the number of iterations, the query plan for the 
dataframe can become long and this can cause slowdowns (or even crashes).
A possible solution would be to checkpoint (or simply save and reload the 
dataframe) every once in a while. When reloading from disk, the newly loaded 
dataframe's lineage is just the disk...

Thanks,
  Assaf.

-Original Message-
From: saatvikshah1994 [mailto:saatvikshah1...@gmail.com] 
Sent: Tuesday, June 20, 2017 2:22 AM
To: user@spark.apache.org
Subject: Merging multiple Pandas dataframes

Hi, 

I am iteratively receiving a file which can only be opened as a Pandas 
dataframe. For the first such file I receive, I am converting this to a Spark 
dataframe using the 'createDataframe' utility function. The next file onward, I 
am converting it and union'ing it into the first Spark dataframe(the schema 
always stays the same). After each union, I am persisting it in 
memory(MEMORY_AND_DISK_ONLY level). After I have converted all such files to a 
single spark dataframe I am coalescing it. Following some tips from this Stack 
Overflow
post(https://stackoverflow.com/questions/39381183/managing-spark-partitions-after-dataframe-unions).
   

Any suggestions for optimizing this process further?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Merging-multiple-Pandas-dataframes-tp28770.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



having trouble using structured streaming with file sink (parquet)

2017-06-13 Thread Mendelson, Assaf
Hi all,

I have recently started assessing structured streaming and ran into a little 
snag from the beginning.

Basically I wanted to read some data, do some basic aggregation and write the 
result to file:

import org.apache.spark.sql.functions.avg
import org.apache.spark.sql.streaming.ProcessingTime
val rawRecords = spark.readStream.schema(myschema).parquet("/mytest")
val q = rawRecords.withColumn("g",$"id" % 100).groupBy("g").agg(avg($"id"))
val res = q.writeStream.outputMode("complete").trigger(ProcessingTime("10 
seconds")).format("parquet").option("path", 
"/test2").option("checkpointLocation", "/mycheckpoint").start

The problem is that it tells me that parquet does not support the complete mode 
(or update for that matter).
So how would I do a streaming with aggregation to file?
In general, my goal is to have a single (slow) streaming process which would 
write some profile and then have a second streaming process which would load 
the current dataframe to be used in join (I would stop the second streaming 
process and reload the dataframe periodically).

Any help would be appreciated.

Thanks,
  Assaf.



RE: [How-To] Custom file format as source

2017-06-12 Thread Mendelson, Assaf
Try 
https://mapr.com/blog/spark-data-source-api-extending-our-spark-sql-query-engine/
 

Thanks,
  Assaf.

-Original Message-
From: OBones [mailto:obo...@free.fr] 
Sent: Monday, June 12, 2017 1:01 PM
To: user@spark.apache.org
Subject: [How-To] Custom file format as source

Hello,

I have an application here that generates data files in a custom binary format 
that provides the following information:

Column list, each column has a data type (64 bit integer, 32 bit string index, 
64 bit IEEE float, 1 byte boolean) Catalogs that give modalities for some 
columns (ie, column 1 contains only the following values: A, B, C, D) Array for 
actual data, each row has a fixed size according to the columns.

Here is an example:

Col1, 64bit integer
Col2, 32bit string index
Col3, 64bit integer
Col4, 64bit float

Catalog for Col1 = 10, 20, 30, 40, 50
Catalog for Col2 = Big, Small, Large, Tall Catalog for Col3 = 101, 102, 103, 
500, 5000 Catalog for Col4 = (no catalog)

Data array =
8 bytes, 4 bytes, 8 bytes, 8 bytes,
8 bytes, 4 bytes, 8 bytes, 8 bytes,
8 bytes, 4 bytes, 8 bytes, 8 bytes,
8 bytes, 4 bytes, 8 bytes, 8 bytes,
8 bytes, 4 bytes, 8 bytes, 8 bytes,
...

I would like to use this kind of file as a source for various ML related 
computations (CART, RandomForrest, Gradient boosting...) and Spark is very 
interesting in this area.
However, I'm a bit lost as to what I should write to have Spark use that file 
format as a source for its computation. Considering that those files are quite 
big (100 million lines, hundreds of gigs on disk), I'd rather not create 
something that writes a new file in a built-in format, but I'd rather write 
some code that makes Spark accept the file as it is.

I looked around and saw the textfile method but it is not applicable to my 
case. I also saw the spark.read.format("libsvm") syntax which tells me that 
there is a list of supported formats known to spark, which I believe are called 
Dataframes, but I could not find any tutorial on this subject.

Would you have any suggestion or links to documentation that would get me 
started?

Regards,
Olivier

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[WARN] org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextI

2017-05-28 Thread Mendelson, Assaf
Hi,
I am getting the following warning:

[WARN] org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize 
counter due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

This seems to occur every time I try to read from parquet a nested data 
structure.

Example to reproduce:


case class A(v: Int)
case class B(v: A)
val filename = "test"
val a = A(1)
val b = B(a)
val df1: DataFrame = Seq[B](b).toDF
df1.write.parquet(filename)
val df2 = spark.read.parquet(filename)
df2.show()


I also found this https://issues.apache.org/jira/browse/SPARK-18660 but it has 
no info and no resolution (it does point to the issue being fixed in parquet 
1.10 while spark still uses parquet 1.8)
I currently added the following to log4j.properties:
log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=ERROR

however, this seems like a poor solution to me.
Any decent way to solve this?


Thanks,
  Assaf.


RE: strange warning

2017-05-25 Thread Mendelson, Assaf
Some more info:
It seems this is caused due to complex data structure.
Consider the following simple example:

case class A(v: Int)
case class B(v: A)
val filename = "test"
val a = A(1)
val b = B(a)
val df1: DataFrame = Seq[B](b).toDF
df1.write.parquet(filename)
val df2 = spark.read.parquet(filename)
df2.show()

Any ideas?

Thanks,
  Assaf.

From: Mendelson, Assaf [mailto:assaf.mendel...@rsa.com]
Sent: Thursday, May 25, 2017 9:55 AM
To: user@spark.apache.org
Subject: strange warning

Hi all,

Today, I got the following warning:
[WARN] org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize 
counter due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

This occurs on one of my tests but not on others (all use parquet). I found 
this https://issues.apache.org/jira/browse/PARQUET-220 but I am using spark 
2.1.0 which uses parquet 1.8 if I am not mistaken. I also found this: 
https://issues.apache.org/jira/browse/SPARK-8118 but again, it is very old. 
Also it only happens on one case where I save my parquet files and not others.

Does anyone know what it means and how to get rid of it?

Thanks,
  Assaf.



strange warning

2017-05-25 Thread Mendelson, Assaf
Hi all,

Today, I got the following warning:
[WARN] org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize 
counter due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

This occurs on one of my tests but not on others (all use parquet). I found 
this https://issues.apache.org/jira/browse/PARQUET-220 but I am using spark 
2.1.0 which uses parquet 1.8 if I am not mistaken. I also found this: 
https://issues.apache.org/jira/browse/SPARK-8118 but again, it is very old. 
Also it only happens on one case where I save my parquet files and not others.

Does anyone know what it means and how to get rid of it?

Thanks,
  Assaf.



RE: [WARN] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

2017-05-17 Thread Mendelson, Assaf
Thanks for the response.
I will try with log4j.
That said, I am running in windows using winutil.exe and still getting the 
warning.

Thanks,
  Assaf.

From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Tuesday, May 16, 2017 6:55 PM
To: Mendelson, Assaf
Cc: user@spark.apache.org
Subject: Re: [WARN] org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable


On 10 May 2017, at 13:40, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:

Hi all,
When running spark I get the following warning: [WARN] 
org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Now I know that in general it is possible to ignore this warning, however, it 
means that utilities that catch “WARN” in the log keep flagging this.
I saw many answers to handling this (e.g. 
http://stackoverflow.com/questions/30369380/hadoop-unable-to-load-native-hadoop-library-for-your-platform-error-on-docker,
 
http://stackoverflow.com/questions/19943766/hadoop-unable-to-load-native-hadoop-library-for-your-platform-warning,http://stackoverflow.com/questions/40015416/spark-unable-to-load-native-hadoop-library-for-your-platform),
 however, I am unable to solve this on my local machine.
Specifically, I can’t find any such solution for windows (i.e. when running 
developer local builds) or on a centos 7 machine with no HDFS (basically it is 
a single node machine which uses spark standalone for testing).


Log4J is your friend. I usually have (at least)

log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN

if you are working on Windows though, you do actually need the native libraries 
an winutils.exe on your path, or things won't work


Any help would be appreciated.

Thanks,
  Assaf.



incremental broadcast join

2017-05-10 Thread Mendelson, Assaf
Hi,

It seems as if when doing broadcast join, the entire dataframe is resent even 
if part of it has already been broadcasted.

Consider the following case:

val df1 = ???
val df2 = ???
val df3 = ???

df3.join(broadcast(df1), on=cond, "left_outer")
followed by
df4.join(broadcast(df1.union(df2), on=cond, "left_outer")

I would expect the second broadcast to only broadcast the difference. However, 
if I do explain(true) I see the entire union is broadcast.

My use case is that I have a series of dataframes on which I need to do some 
enrichment, joining them with a small dataframe. The small dataframe gets 
additional information (as the result of each aggregation).

Is there an efficient way of doing this?

Thanks,
  Assaf.



[WARN] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

2017-05-10 Thread Mendelson, Assaf
Hi all,
When running spark I get the following warning: [WARN] 
org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Now I know that in general it is possible to ignore this warning, however, it 
means that utilities that catch "WARN" in the log keep flagging this.
I saw many answers to handling this (e.g. 
http://stackoverflow.com/questions/30369380/hadoop-unable-to-load-native-hadoop-library-for-your-platform-error-on-docker,
 
http://stackoverflow.com/questions/19943766/hadoop-unable-to-load-native-hadoop-library-for-your-platform-warning,
 
http://stackoverflow.com/questions/40015416/spark-unable-to-load-native-hadoop-library-for-your-platform),
 however, I am unable to solve this on my local machine.
Specifically, I can't find any such solution for windows (i.e. when running 
developer local builds) or on a centos 7 machine with no HDFS (basically it is 
a single node machine which uses spark standalone for testing).

Any help would be appreciated.

Thanks,
  Assaf.



RE: How to use ManualClock with Spark streaming

2017-04-05 Thread Mendelson, Assaf
You can try taking a look at this: 
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/

Thanks,
  Assaf.

From: Hemalatha A [mailto:hemalatha.amru...@googlemail.com]
Sent: Wednesday, April 05, 2017 1:59 PM
To: Saisai Shao; user@spark.apache.org
Subject: Re: How to use ManualClock with Spark streaming

Any updates on how can I use ManualClock other than editing the Spark source 
code?

On Wed, Mar 1, 2017 at 10:19 AM, Hemalatha A 
> 
wrote:
It is certainly possible through a hack.
I was referring to below post where TD says it is possible thru a hack. I 
wanted to know if there is  any way other than editing the Spark source code.

https://groups.google.com/forum/#!searchin/spark-users/manualclock%7Csort:relevance/spark-users/ES8X1l_xn5s/6PvGGRDfgnMJ

On Wed, Mar 1, 2017 at 7:09 AM, Saisai Shao 
> wrote:
I don't think using ManualClock is a right way to fix your problem here in 
Spark Streaming.

ManualClock in Spark is mainly used for unit test, it should manually advance 
the time to make the unit test work. The usage looks different compared to the 
scenario you mentioned.

Thanks
Jerry

On Tue, Feb 28, 2017 at 10:53 PM, Hemalatha A 
> 
wrote:

Hi,

I am running streaming application reading data from kafka and performing 
window operations on it. I have a usecase where  all incoming events have a 
fixed latency of 10s, which means data belonging to minute 10:00:00 will arrive 
10s late at 10:00:10.

I want to set the spark clock to "Manualclock" and set the time behind by 10s 
so that the batch calculation triggers at 10:00:10, during which time all the 
events for the previous minute has arrived.

But, I see that "spark.streaming.clock" is hardcoded to 
"org.apache.spark.util.SystemClock" in the code.

Is there a way to easily  hack this property to use Manual clock.
--


Regards
Hemalatha




--


Regards
Hemalatha



--


Regards
Hemalatha


handling dependency conflicts with spark

2017-02-27 Thread Mendelson, Assaf
Hi,

I have a project which uses Jackson 2.8.5. Spark on the other hand seems to be 
using 2.6.5
I am using maven to compile.

My original solution to the problem have been to set spark dependencies with 
the "provided" scope and use maven shade plugin to shade Jackson in my 
compilation.

The problem is, that when I run maven tests, the shading has not yet occurred 
and I get a conflict.

I thought of creating an uber jar of my dependencies and shade that, however, 
since my main dependency uses spring heavily, doing so causes configuration to 
not be created.

I was hoping someone has a better idea.
Thanks,
Assaf.


RE: fault tolerant dataframe write with overwrite

2017-02-14 Thread Mendelson, Assaf
Thanks, I didn’t know the Hadoop API supports other file systems other than 
HDFS and local file system (when there is 1 node).
My main goal is indeed for checkpointing, every N iterations I save the data 
for future use. The problem is that if I use overwrite mode then it first 
deletes and then write the new one so that is what I am looking to solve.

I wasn’t aware of the issues with renaming in S3 (we currently not using it, we 
just know we would probably need to support it or a similar store in the 
future). That said, how does spark handle this case then when writing a 
dataframe? Currently it writes everything to a temporary sub directory and 
renames it at the end?

In any case, I was hoping for some way internal to spark to do a write which 
does not harm the previous version of the dataframe on disk until a successful 
writing of the new one.
Thanks,
Assaf.


From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Tuesday, February 14, 2017 3:25 PM
To: Mendelson, Assaf
Cc: Jörn Franke; user
Subject: Re: fault tolerant dataframe write with overwrite


On 14 Feb 2017, at 11:12, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:

I know how to get the filesystem, the problem is that this means using Hadoop 
directly so if in the future we change to something else (e.g. S3) I would need 
to rewrite the code.

well, no, because the s3 and hfs clients use the same API

FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)

vs

FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)

same for wasb://  (which, being consistent and with fast atomic rename, can be 
used instead of HDFS), other cluster filesystems. If it's a native fs, then 
file:// should work everywhere, or some derivative (as redhat do with gluster)


This also relate to finding the last iteration, I would need to use Hadoop 
filesystem which is not agnostic to the deployment.


see above. if you are using a spark cluster of size > 1 you will need some 
distributed filesystem, which is going to have to provide a

If there is an issue here, it is that if you rely on FileSystem.rename() being 
an atomic O(1) operation then you are going to be disappointed on S3, as its a 
non-atomic O(data) copy & delete whose failure state is "undefined".


The solution here comes from having specific commiter logic for the different 
object stores. You really, really don' t want to go there. If you do, have a 
start by looking at the S3guard WiP one: 
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

further reading: 
http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores


Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea 
is that every X iterations I want to save to disk so that if something crashes 
I do not have to begin from the first iteration but just from the relevant 
iteration.


sounds like you don't really want the output to always be the FS, more 
checkpointing iterations. Couldn't you do something like every 20 iterations, 
write() the relevant RDD to the DFS


Basically I would have liked to see something like saving normally and the 
original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I 
assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have 
as the directory the current timestamp and at the end you simply select the 
directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space 
overhead.

Aside from that, can you elaborate on the use case why you need to write every 
iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven’t full

RE: fault tolerant dataframe write with overwrite

2017-02-14 Thread Mendelson, Assaf
I know how to get the filesystem, the problem is that this means using Hadoop 
directly so if in the future we change to something else (e.g. S3) I would need 
to rewrite the code. This also relate to finding the last iteration, I would 
need to use Hadoop filesystem which is not agnostic to the deployment.

Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea 
is that every X iterations I want to save to disk so that if something crashes 
I do not have to begin from the first iteration but just from the relevant 
iteration.

Basically I would have liked to see something like saving normally and the 
original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I 
assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have 
as the directory the current timestamp and at the end you simply select the 
directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space 
overhead.

Aside from that, can you elaborate on the use case why you need to write every 
iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven’t fully tested the process 
there), however, checkpointing converts the result to RDD which both takes more 
time and more space.
I was wondering if there is any efficient method of managing this from inside 
spark.
Thanks,
Assaf.


fault tolerant dataframe write with overwrite

2017-02-14 Thread Mendelson, Assaf
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven't fully tested the process 
there), however, checkpointing converts the result to RDD which both takes more 
time and more space.
I was wondering if there is any efficient method of managing this from inside 
spark.
Thanks,
Assaf.


RE: [Spark Launcher] How to launch parallel jobs?

2017-02-14 Thread Mendelson, Assaf
You should also check your memory usage.
Let’s say for example you have 16 cores and 8 GB. And that you use 4 executors 
with 1 core each.
When you use an executor, spark reserves it from yarn and yarn allocates the 
number of cores (e.g. 1 in our case) and the memory. The memory is actually 
more than you asked for. If you ask for 1GB it will in fact allocate almost 
1.5GB with overhead. In addition, it will probably allocate an executor for the 
driver (probably with 1024MB memory usage).
When you run your program and look in port 8080, you should look not only on 
the VCores used out of the VCores total but also on the Memory used and Memory 
total. You should also navigate to the executors (e.g. applications->running on 
the left and then choose you application and navigate all the way down to a 
single container). You can see there the actual usage.

BTW, it doesn’t matter how much memory your program wants but how much it 
reserves. In your example it will not take the 50MB of the test but the ~1.5GB 
(after overhead) per executor.
Hope this helps,
Assaf.

From: Cosmin Posteuca [mailto:cosmin.poste...@gmail.com]
Sent: Tuesday, February 14, 2017 9:53 AM
To: Egor Pahomov
Cc: user
Subject: Re: [Spark Launcher] How to launch parallel jobs?

Hi Egor,

About the first problem i think you are right, it's make sense.

About the second problem, i check available resource on 8088 port and there 
show 16 available cores. I start my job with 4 executors with 1 core each, and 
1gb per executor. My job use maximum 50mb of memory(just for test). From my 
point of view the resources are enough, and the problem i think is from yarn 
configuration files, but i don't know what is missing.

Thank you

2017-02-13 21:14 GMT+02:00 Egor Pahomov 
>:
About second problem: I understand this can be in two cases: when one job 
prevents the other one from getting resources for executors or (2) bottleneck 
is reading from disk, so you can not really parallel that. I have no experience 
with second case, but it's easy to verify the fist one: just look on you hadoop 
UI and verify, that both job get enough resources.

2017-02-13 11:07 GMT-08:00 Egor Pahomov 
>:
"But if i increase only executor-cores the finish time is the same". More 
experienced ones can correct me, if I'm wrong, but as far as I understand that: 
one partition processed by one spark task. Task is always running on 1 core and 
not parallelized among cores. So if you have 5 partitions and you increased 
totall number of cores among cluster from 7 to 10 for example - you have not 
gained anything. But if you repartition you give an opportunity to process 
thing in more threads, so now more tasks can execute in parallel.

2017-02-13 7:05 GMT-08:00 Cosmin Posteuca 
>:
Hi,

I think i don't understand enough how to launch jobs.

I have one job which takes 60 seconds to finish. I run it with following 
command:


spark-submit --executor-cores 1 \

 --executor-memory 1g \

 --driver-memory 1g \

 --master yarn \

 --deploy-mode cluster \

 --conf spark.dynamicAllocation.enabled=true \

 --conf spark.shuffle.service.enabled=true \

 --conf spark.dynamicAllocation.minExecutors=1 \

 --conf spark.dynamicAllocation.maxExecutors=4 \

 --conf spark.dynamicAllocation.initialExecutors=4 \

 --conf spark.executor.instances=4 \

If i increase number of partitions from code and number of executors the app 
will finish faster, which it's ok. But if i increase only executor-cores the 
finish time is the same, and i don't understand why. I expect the time to be 
lower than initial time.

My second problem is if i launch twice above code i expect that both jobs to 
finish in 60 seconds, but this don't happen. Both jobs finish after 120 seconds 
and i don't understand why.

I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu has 2 
threads). From what i saw in default EMR configurations, yarn is set on 
FIFO(default) mode with CapacityScheduler.

What do you think about this problems?

Thanks,

Cosmin



--
Sincerely yours
Egor Pakhomov



--
Sincerely yours
Egor Pakhomov



RE: is dataframe thread safe?

2017-02-12 Thread Mendelson, Assaf
There is no threads within maps here. The idea is to have two jobs on two 
different threads which use the same dataframe (which is cached btw).
This does not override spark’s parallel execution of transformation or any 
such. The documentation (job scheduling) actually hints at this option but 
doesn’t say specifically if it is supported when the same dataframe is used.
As for configuring the scheduler, this would not work. First it would mean that 
the same cached dataframe cannot be used, I would have to add some additional 
configuration such as alluxio (and it would still have to 
serialize/deserialize) as opposed to using the cached data. Furthermore, 
multi-tenancy between applications is limited to either dividing the cluster 
between the applications or using dynamic allocation (which has its own 
overheads).

Therefore Sean’s answer is what I was looking for (and hoping for…)
Assaf

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Sunday, February 12, 2017 2:46 PM
To: Sean Owen
Cc: Mendelson, Assaf; user
Subject: Re: is dataframe thread safe?

I did not doubt that the submission of several jobs of one application makes 
sense. However, he want to create threads within maps etc., which looks like 
calling for issues (not only for running the application itself, but also for 
operating it in production within a shared cluster). I would rely for parallel 
execution of the transformations on the out-of-the-box functionality within 
Spark.

For me he looks for a solution that can be achieved by a simple configuration 
of the scheduler in Spark, yarn or mesos. In this way the application would be 
more maintainable in production.

On 12 Feb 2017, at 11:45, Sean Owen 
<so...@cloudera.com<mailto:so...@cloudera.com>> wrote:
No this use case is perfectly sensible. Yes it is thread safe.
On Sun, Feb 12, 2017, 10:30 Jörn Franke 
<jornfra...@gmail.com<mailto:jornfra...@gmail.com>> wrote:
I think you should have a look at the spark documentation. It has something 
called scheduler who does exactly this. In more sophisticated environments yarn 
or mesos do this for you.

Using threads for transformations does not make sense.

On 12 Feb 2017, at 09:50, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
I know spark takes care of executing everything in a distributed manner, 
however, spark also supports having multiple threads on the same spark 
session/context and knows (Through fair scheduler) to distribute the tasks from 
them in a round robin.

The question is, can those two actions (with a different set of 
transformations) be applied to the SAME dataframe.

Let’s say I want to do something like:



Val df = ???
df.cache()
df.count()

def f1(df: DataFrame): Unit = {
  val df1 = df.groupby(something).agg(some aggs)
  df1.write.parquet(“some path”)
}

def f2(df: DataFrame): Unit = {
  val df2 = df.groupby(something else).agg(some different aggs)
  df2.write.parquet(“some path 2”)
}

f1(df)
f2(df)

df.unpersist()

if the aggregations do not use the full cluster (e.g. because of data skewness, 
because there aren’t enough partitions or any other reason) then this would 
leave the cluster under utilized.

However, if I would call f1 and f2 on different threads, then df2 can use free 
resources f1 has not consumed and the overall utilization would improve.

Of course, I can do this only if the operations on the dataframe are thread 
safe. For example, if I would do a cache in f1 and an unpersist in f2 I would 
get an inconsistent result. So my question is, what, if any are the legal 
operations to use on a dataframe so I could do the above.

Thanks,
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Sunday, February 12, 2017 10:39 AM
To: Mendelson, Assaf
Cc: user
Subject: Re: is dataframe thread safe?

I am not sure what you are trying to achieve here. Spark is taking care of 
executing the transformations in a distributed fashion. This means you must not 
use threads - it does not make sense. Hence, you do not find documentation 
about it.

On 12 Feb 2017, at 09:06, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,
I was wondering if dataframe is considered thread safe. I know the spark 
session and spark context are thread safe (and actually have tools to manage 
jobs from different threads) but the question is, can I use the same dataframe 
in both threads.
The idea would be to create a dataframe in the main thread and then in two sub 
threads do different transformations and actions on it.
I understand that some things might not be thread safe (e.g. if I unpersist in 
one thread it would affect the other. Checkpointing would cause similar 
issues), however, I can’t find any documentation as to what operations (if any) 
are thread safe.

Thanks,
Assaf.



RE: is dataframe thread safe?

2017-02-12 Thread Mendelson, Assaf
I know spark takes care of executing everything in a distributed manner, 
however, spark also supports having multiple threads on the same spark 
session/context and knows (Through fair scheduler) to distribute the tasks from 
them in a round robin.

The question is, can those two actions (with a different set of 
transformations) be applied to the SAME dataframe.

Let’s say I want to do something like:



Val df = ???
df.cache()
df.count()

def f1(df: DataFrame): Unit = {
  val df1 = df.groupby(something).agg(some aggs)
  df1.write.parquet(“some path”)
}

def f2(df: DataFrame): Unit = {
  val df2 = df.groupby(something else).agg(some different aggs)
  df2.write.parquet(“some path 2”)
}

f1(df)
f2(df)

df.unpersist()

if the aggregations do not use the full cluster (e.g. because of data skewness, 
because there aren’t enough partitions or any other reason) then this would 
leave the cluster under utilized.

However, if I would call f1 and f2 on different threads, then df2 can use free 
resources f1 has not consumed and the overall utilization would improve.

Of course, I can do this only if the operations on the dataframe are thread 
safe. For example, if I would do a cache in f1 and an unpersist in f2 I would 
get an inconsistent result. So my question is, what, if any are the legal 
operations to use on a dataframe so I could do the above.

Thanks,
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Sunday, February 12, 2017 10:39 AM
To: Mendelson, Assaf
Cc: user
Subject: Re: is dataframe thread safe?

I am not sure what you are trying to achieve here. Spark is taking care of 
executing the transformations in a distributed fashion. This means you must not 
use threads - it does not make sense. Hence, you do not find documentation 
about it.

On 12 Feb 2017, at 09:06, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,
I was wondering if dataframe is considered thread safe. I know the spark 
session and spark context are thread safe (and actually have tools to manage 
jobs from different threads) but the question is, can I use the same dataframe 
in both threads.
The idea would be to create a dataframe in the main thread and then in two sub 
threads do different transformations and actions on it.
I understand that some things might not be thread safe (e.g. if I unpersist in 
one thread it would affect the other. Checkpointing would cause similar 
issues), however, I can’t find any documentation as to what operations (if any) 
are thread safe.

Thanks,
Assaf.



is dataframe thread safe?

2017-02-12 Thread Mendelson, Assaf
Hi,
I was wondering if dataframe is considered thread safe. I know the spark 
session and spark context are thread safe (and actually have tools to manage 
jobs from different threads) but the question is, can I use the same dataframe 
in both threads.
The idea would be to create a dataframe in the main thread and then in two sub 
threads do different transformations and actions on it.
I understand that some things might not be thread safe (e.g. if I unpersist in 
one thread it would affect the other. Checkpointing would cause similar 
issues), however, I can't find any documentation as to what operations (if any) 
are thread safe.

Thanks,
Assaf.


Updating variable in foreachRDD

2017-02-09 Thread Mendelson, Assaf
Hi,

I was wondering on how foreachRDD would run.
Specifically, let's say I do something like (nothing real, just for 
understanding):

var df = ???
var counter = 0

dstream.foreachRDD {
rdd: RDD[Long] => {
  val df2 = rdd.toDF(...)
  df = df.union(df2)
 counter += 1
if (counter > 100) {
ssc.stop()
}
}

Would this guarantee that df would be a union of the first 100 micro batches?
i.e. is foreachRDD guaranteed to run on the driver, updating everything locally 
(as opposed to lazily updating stuff or running on a worker)?
In simple tests this appears to work correctly but I am planning to do some 
more complex things (including using multiple dstreams)?

Thanks,
Assaf.


RE: using an alternative slf4j implementation

2017-02-06 Thread Mendelson, Assaf
Found some questions (without answers) and I found some jira 
(https://issues.apache.org/jira/browse/SPARK-4147 and 
https://issues.apache.org/jira/browse/SPARK-14703), however they do not solve 
the issue.
Nominally, a library should not explicitly set a binding, however spark, does 
so (I imagine this is so spark-submit can package everything and have a 
logger). If a dependency does this, the nominal solution would be to exclude 
the binding (and maybe add a relevant bridge), however, since spark adds the 
relevant jars in spark-submit this I don't see how to do this.

Is there any way of forcing logback as the binding?

-Original Message-
From: Jacek Laskowski [mailto:ja...@japila.pl] 
Sent: Monday, February 06, 2017 10:46 AM
To: Mendelson, Assaf
Cc: user
Subject: Re: using an alternative slf4j implementation

Hi,

Sounds like a quite involved development for me. I can't help here.
I'd suggest going through the dev and user mailing lists for the past year and 
JIRA issues regarding the issue as I vaguely remember some discussions about 
logging in Spark (that would merit to do the migration to logback eventually).

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Feb 6, 2017 at 9:06 AM, Mendelson, Assaf <assaf.mendel...@rsa.com> 
wrote:
> Shading doesn’t help (we already shaded everything).
>
> According to https://www.slf4j.org/codes.html#multiple_bindings only 
> one binding can be used. The problem is that once we link to spark 
> jars then we automatically inherit spark’s binding (for log4j).
>
> I would like to find a way to either send spark’s logs to log4j and my 
> logs to logback or send everything to logback.
>
> Assaf.
>
>
>
> From: Jacek Laskowski [mailto:ja...@japila.pl]
> Sent: Monday, February 06, 2017 12:47 AM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: using an alternative slf4j implementation
>
>
>
> Hi,
>
>
>
> Shading conflicting dependencies?
>
>
>
> Jacek
>
>
>
> On 5 Feb 2017 3:56 p.m., "Mendelson, Assaf" <assaf.mendel...@rsa.com> wrote:
>
> Hi,
>
> Spark seems to explicitly use log4j.
>
> This means that if I use an alternative backend for my application (e.g.
> ch.qos.logback) I have a conflict.
>
> Sure I can exclude logback but that means my application cannot use 
> our internal tools.
>
>
>
> Is there a way to use logback as a backend logging while using spark?
>
> Assaf.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: using an alternative slf4j implementation

2017-02-06 Thread Mendelson, Assaf
Shading doesn’t help (we already shaded everything).
According to https://www.slf4j.org/codes.html#multiple_bindings only one 
binding can be used. The problem is that once we link to spark jars then we 
automatically inherit spark’s binding (for log4j).
I would like to find a way to either send spark’s logs to log4j and my logs to 
logback or send everything to logback.
Assaf.

From: Jacek Laskowski [mailto:ja...@japila.pl]
Sent: Monday, February 06, 2017 12:47 AM
To: Mendelson, Assaf
Cc: user
Subject: Re: using an alternative slf4j implementation

Hi,

Shading conflicting dependencies?

Jacek

On 5 Feb 2017 3:56 p.m., "Mendelson, Assaf" 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,
Spark seems to explicitly use log4j.
This means that if I use an alternative backend for my application (e.g. 
ch.qos.logback) I have a conflict.
Sure I can exclude logback but that means my application cannot use our 
internal tools.

Is there a way to use logback as a backend logging while using spark?
Assaf.


using an alternative slf4j implementation

2017-02-05 Thread Mendelson, Assaf
Hi,
Spark seems to explicitly use log4j.
This means that if I use an alternative backend for my application (e.g. 
ch.qos.logback) I have a conflict.
Sure I can exclude logback but that means my application cannot use our 
internal tools.

Is there a way to use logback as a backend logging while using spark?
Assaf.


RE: forcing dataframe groupby partitioning

2017-01-29 Thread Mendelson, Assaf
Could you explain why this would work?
Assaf.

From: Haviv, Daniel [mailto:dha...@amazon.com]
Sent: Sunday, January 29, 2017 7:09 PM
To: Mendelson, Assaf
Cc: user@spark.apache.org
Subject: Re: forcing dataframe groupby partitioning

If there's no built in local groupBy, You could do something like that:
df.groupby(C1,C2).agg(...).flatmap(x=>x.groupBy(C1)).agg

Thank you.
Daniel

On 29 Jan 2017, at 18:33, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,

Consider the following example:

df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg)

The default way spark would behave would be to shuffle according to a 
combination of C1 and C2 and then shuffle again by C1 only.
This behavior makes sense when one uses C2 to salt C1 for skewed data, however, 
when this is part of the logic the cost of doing two shuffles instead of one is 
not insignificant (even worse when multiple iterations are done).

Is there a way to force the first groupby to shuffle the data so that the 
second groupby would occur within the partition?

Thanks,
Assaf.


forcing dataframe groupby partitioning

2017-01-29 Thread Mendelson, Assaf
Hi,

Consider the following example:

df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg)

The default way spark would behave would be to shuffle according to a 
combination of C1 and C2 and then shuffle again by C1 only.
This behavior makes sense when one uses C2 to salt C1 for skewed data, however, 
when this is part of the logic the cost of doing two shuffles instead of one is 
not insignificant (even worse when multiple iterations are done).

Is there a way to force the first groupby to shuffle the data so that the 
second groupby would occur within the partition?

Thanks,
Assaf.


Wierd performance on windows laptop

2017-01-24 Thread Mendelson, Assaf
Hi,

I created a simple synthetic test which does a sample calculation twice, each 
time with different partitioning:
def time[R](block: => R): Long = {
  val t0 = System.currentTimeMillis()
  block// call-by-name
  val t1 = System.currentTimeMillis()
  t1 - t0
}

val base_df = spark.range(1001).withColumn("A", $"id" % 65)
val df1 = base_df.cache()
val df2 = base_df.repartition(201).cache()
df1.count()
df2.count()
val t1 = time { df1.groupBy("A").agg(sum($"id")).collect() }
val t2 = time { df2.groupBy("A").agg(sum($"id")).collect() }
println(s"first took $t1, second took $t2, ratio=${t2/t1}")

Now I ran this on three different platforms (all using local[2] master):

1.   In windows on a laptop I get: first took 2454, second took 48089, 
ratio=19

2.   In a VM on the same laptop (under centos) I get: first took 2911, 
second took 8699, ratio=2

3.   On another VM with a lot more power (but still using local[2] as 
master) I get: first took 1483, second took 2354, ratio=1
I can't figure out these numbers. I expected to see the behavior on the strong 
VM where the difference was relatively small but why would it be such a larger 
ratio on the local VM and a ratio of 19 On windows

Can anyone help me figure out why this is?
Assaf.


RE: top-k function for Window

2017-01-03 Thread Mendelson, Assaf
Assume you have a UDAF which looks like this:

-  Input: The value

-  Buffer: K elements

-  Output: An array (which would have the K elements)

-  Init: Initialize all elements to some irrelevant value (e.g. 
int.MinValue)

-  Update: Start going over the buffer find the spot which is smaller 
than the current value then push everything forward and put it in (i.e. sorted 
insert)

-  Merge: “merge sort” between the two buffers

-  Evaluate: turn the buffer to array
Then run the UDAF on the groupby.

The result would be an array of (upto) K elements per key. To turn it back to K 
lines all you need to do is explode it.

Assuming that K is small, the calculation of the UDAF would be much faster than 
the sorting (it only needs to do sortings on very small K).

Assaf.
From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 8:03 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: top-k function for Window

> Furthermore, in your example you don’t even need a window function, you can 
> simply use groupby and explode

Can you clarify? You need to sort somehow (be it map-side sorting or 
reduce-side sorting).



---
Regards,
Andy

On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com<mailto:nam...@gmail.com>]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy



RE: top-k function for Window

2017-01-03 Thread Mendelson, Assaf
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy


RE: What is missing here to use sql in spark?

2017-01-02 Thread Mendelson, Assaf
sqlContext.sql("select distinct CARRIER from flight201601") defines a dataframe 
which is lazily evaluated.
This means that it returns a dataframe (which is what you got).
If you want to see the results do:
sqlContext.sql("select distinct CARRIER from flight201601").show()
or
df = sqlContext.sql("select distinct CARRIER from flight201601")
df.show()

Assaf


From: Raymond Xie [mailto:xie3208...@gmail.com]
Sent: Monday, January 02, 2017 6:23 AM
To: user
Subject: What is missing here to use sql in spark?

Happy new year!

Below is my script:

pyspark --packages com.databricks:spark-csv_2.10:1.4.0
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', 
inferschema='true').load('file:///root/Downloads/data/flight201601short2.csv')
df.show(5)
df.registerTempTable("flight201601")
sqlContext.sql("select distinct CARRIER from flight201601")

df.show(5) is below:

++---+-++---+--+--+--+---++--+
|YEAR|QUARTER|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|   
FL_DATE|UNIQUE_CARRIER|AIRLINE_ID|CARRIER|TAIL_NUM|FL_NUM|
++---+-++---+--+--+--+---++--+
|2016|  1|1|   6|  3|2016-01-06|AA| 
19805| AA|  N4YBAA|43|
|2016|  1|1|   7|  4|2016-01-07|AA| 
19805| AA|  N434AA|43|
|2016|  1|1|   8|  5|2016-01-08|AA| 
19805| AA|  N541AA|43|
|2016|  1|1|   9|  6|2016-01-09|AA| 
19805| AA|  N489AA|43|
|2016|  1|1|  10|  7|2016-01-10|AA| 
19805| AA|  N439AA|43|
++---+-++---+--+--+--+---++--+

The final result is NOT what I am expecting, it currently shows the following:

>>> sqlContext.sql("select distinct CARRIER from flight201601")
DataFrame[CARRIER: string]

I am expecting the distinct CARRIER will be created:

AA
BB
CC
...

flight201601short2.csv is attached here for your reference.


Thank you very much.




Sincerely yours,


Raymond



RE: Location for the additional jar files in Spark

2016-12-27 Thread Mendelson, Assaf
You should probably add --driver-class-path with the jar as well. In theory 
--jars should add it to the driver as well but in my experience it does not (I 
think there was a jira open on it). In any case you can find it in 
stackoverflow: See 
http://stackoverflow.com/questions/40995943/connect-to-oracle-db-using-pyspark/41000181#41000181.
 Another thing you might want to try is adding the driver option to the read. 
See 
http://stackoverflow.com/questions/36326066/working-with-jdbc-jar-in-pyspark/36328672#36328672.
Assaf

From: Léo Biscassi [mailto:leo.bisca...@gmail.com]
Sent: Tuesday, December 27, 2016 2:59 PM
To: Mich Talebzadeh; Deepak Sharma
Cc: user @spark
Subject: Re: Location for the additional jar files in Spark


Hi all,
I have the same problem with spark 2.0.2.

Best regards,

On Tue, Dec 27, 2016, 9:40 AM Mich Talebzadeh 
> wrote:
Thanks Deppak

but get the same error unfortunately

ADD_JARS="/home/hduser/jars/ojdbc6.jar" spark-shell
Spark context Web UI available at http://50.140.197.217:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1482842478988).

Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
  /_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
HiveContext: org.apache.spark.sql.hive.HiveContext = 
org.apache.spark.sql.hive.HiveContext@a323a5b
scala> //val sqlContext = new HiveContext(sc)
scala> println ("\nStarted at"); spark.sql("SELECT 
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
").collect.foreach(println)
Started at
[27/12/2016 12:41:43.43]
scala> //
scala> var _ORACLEserver= "jdbc:oracle:thin:@rhes564:1521:mydb12"
_ORACLEserver: String = jdbc:oracle:thin:@rhes564:1521:mydb12
scala> var _username = "scratchpad"
_username: String = scratchpad
scala> var _password = "oracle"
_password: String = oracle
scala> //
scala> val s = HiveContext.read.format("jdbc").options(
 | Map("url" -> _ORACLEserver,
 | "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED, 
RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
 | "partitionColumn" -> "ID",
 | "lowerBound" -> "1",
 | "upperBound" -> "1",
 | "numPartitions" -> "10",
 | "user" -> _username,
 | "password" -> _password)).load
java.sql.SQLException: No suitable driver
  at java.sql.DriverManager.getDriver(DriverManager.java:315)
  at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
  at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
  at scala.Option.getOrElse(Option.scala:121)
  at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:53)
  at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123)
  at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:117)
  at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:53)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:315)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
  ... 56 elided


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 27 December 2016 at 11:37, Deepak Sharma 
> wrote:
How about this:
ADD_JARS="/home/hduser/jars/ojdbc6.jar" spark-shell

Thanks
Deepak

On Tue, Dec 27, 2016 at 5:04 PM, Mich Talebzadeh 
> wrote:
Ok I tried this but no luck

spark-shell --jars /home/hduser/jars/ojdbc6.jar
Spark context Web UI available at http://50.140.197.217:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1482838526271).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
  /_/
Using Scala version 2.11.8 (Java 

RE: streaming performance

2016-12-25 Thread Mendelson, Assaf
Hi,
I may be missing something but let’s say we would have worked with a DStream 
window. If we have a sliding window of 5 minutes every 1 minute then an RDD 
would have been generated every minute, then the RDD for the last 5 minutes 
would have been joined and then I would have converted them to dataframe. So I 
simulated it by skipping the RDD and going directly to union of dataframes. I 
believed that since spark caches shuffle results it would cache the partial 
aggregation and I would get an incremental aggregate. Obviously this is not 
happening and I am aggregating the older dataframes again and again.

What am I missing here? Is there a way to do it?

As for structured streaming, I understand this is the future of spark but 
currently, there are too many important unsupported elements (e.g. no support 
for multiple aggregations, no support for distinct operations, no support for 
outer join etc.).

Assaf.


From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Friday, December 23, 2016 2:46 AM
To: Mendelson, Assaf
Cc: user
Subject: Re: streaming performance

From what I understand looking at the code in stackoverflow, I think you are 
"simulating" the streaming version of your calculation incorrectly. You are 
repeatedly unioning batch dataframes to simulate streaming and then applying 
aggregation on the unioned DF. That will not going to compute aggregates 
incrementally, it will just process the whole data every time. So the oldest 
batch DF again and again, causing an increasing resource usage. Thats not how 
streaming works, so this is not simulating the right thing.

With Structured Streaming's streaming dataframes, it is actually done 
incrementally. The best way to try that is to generate a file per "bucket", and 
then create a streaming dataframe on the files such that they are one by one. 
See this notebook for the maxFilesPerTrigger option in 
http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Structured%20Streaming%20using%20Scala%20DataFrames%20API.html
This would process each file one by one, maintain internal state to continuous 
update the aggregates and never require reprocessing the old data.

Hope this helps



On Wed, Dec 21, 2016 at 7:58 AM, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:

am having trouble with streaming performance. My main problem is how to do a 
sliding window calculation where the ratio between the window size and the step 
size is relatively large (hundreds) without recalculating everything all the 
time.

I created a simple example of what I am aiming at with what I have so far which 
is detailed in 
http://stackoverflow.com/questions/41266956/apache-spark-streaming-performance

I was hoping someone can point me to what I am doing wrong.

Thanks,

Assaf.



streaming performance

2016-12-21 Thread Mendelson, Assaf
am having trouble with streaming performance. My main problem is how to do a 
sliding window calculation where the ratio between the window size and the step 
size is relatively large (hundreds) without recalculating everything all the 
time.

I created a simple example of what I am aiming at with what I have so far which 
is detailed in 
http://stackoverflow.com/questions/41266956/apache-spark-streaming-performance

I was hoping someone can point me to what I am doing wrong.

Thanks,

Assaf.


RE: About transformations

2016-12-09 Thread Mendelson, Assaf
This is a guess but I would bet that most of the time when into the loading of 
the data. The second time there are many places this could be cached (either  
by spark or even by the OS if you are reading from file).

-Original Message-
From: brccosta [mailto:brunocosta@gmail.com] 
Sent: Friday, December 09, 2016 1:24 PM
To: user@spark.apache.org
Subject: About transformations

Dear guys,

We're performing some tests to evaluate the behavior of transformations and 
actions in Spark with Spark SQL. In our tests, first we conceive a simple 
dataflow with 2 transformations and 1 action:

LOAD (result: df_1) > SELECT ALL FROM df_1 (result: df_2) > COUNT(df_2)

The execution time for this first dataflow was 10 seconds. Next, we added 
another action to our dataflow:

LOAD (result: df_1) > SELECT ALL FROM df_1 (result: df_2) > COUNT(df_2) >
COUNT(df_2)

Analyzing the second version of the dataflow, since all transformation are lazy 
and re-executed for each action (according to the documentation), when 
executing the second count, it should require the execution of the two previous 
transformations (LOAD and SELECT ALL). Thus, we expected that when executing 
this second version of our dataflow, the time would be around 20 seconds. 
However, the execution time was 11 seconds. Apparently, the results of the 
transformations required by the first count were cached by Spark for the second 
count.

Please, do you guys know what is happening? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/About-transformations-tp28188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: few basic questions on structured streaming

2016-12-08 Thread Mendelson, Assaf
For watermarking you can read this excellent article: part 1: 
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101, part2: 
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102. It explains 
more than just watermarking but it helped me understand a lot of the concepts 
in structured streaming.
In any case, watermarking is currently not implemented yet. I believe it is 
targeted at spark 2.1 which is supposed to come out soon.
Assaf.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, December 08, 2016 1:50 PM
To: user @spark
Subject: few basic questions on structured streaming

Hi All,

I read the documentation on Structured Streaming based on event time and I have 
the following questions.

1. what happens if an event arrives few days late? Looks like we have an 
unbound table with sorted time intervals as keys but I assume spark doesn't 
keep several days worth of data in memory but rather it would checkpoint parts 
of the unbound table to a storage at a specified interval such that if an event 
comes few days late it would update the part of the table that is in memory 
plus the parts of the table that are in storage which contains the interval 
(Again this is just my assumption, I don't know what it really does). is this 
correct so far?

2.  Say I am running a Spark Structured streaming Job for 90 days with a window 
interval of 10 mins and a slide interval of 5 mins. Does the output of this Job 
always return the entire history in a table? other words the does the output on 
90th day contains a table of 10 minute time intervals from day 1 to day 90? If 
so, wouldn't that be too big to return as an output?

3. For Structured Streaming is it required to have a distributed storage such 
as HDFS? my guess would be yes (based on what I said in #1) but I would like to 
confirm.

4. I briefly heard about watermarking. Are there any pointers where I can know 
them more in detail? Specifically how watermarks could help in structured 
streaming and so on.

Thanks,
kant



RE: How to find unique values after groupBy() in spark dataframe ?

2016-12-08 Thread Mendelson, Assaf
Groupby is not an actual result but a construct to allow defining aggregations.

So you can do:


import org.apache.spark.sql.{functions => func}

 val resDF = df.groupBy("client").agg(func.collect_set(df("Date")))


Note that collect_set can be a little heavy in terms of performance so if you 
just want to count, you should probably use approxCountDistinct
Assaf.

From: Devi P.V [mailto:devip2...@gmail.com]
Sent: Thursday, December 08, 2016 10:38 AM
To: user @spark
Subject: How to find unique values after groupBy() in spark dataframe ?

Hi all,

I have a dataframe like following,
+-+--+
|client_id|Date  |
+ +--+
| a   |2016-11-23|
| b   |2016-11-18|
| a   |2016-11-23|
| a   |2016-11-23|
| a   |2016-11-24|
+-+--+
I want to find unique dates of each client_id using spark dataframe.
expected output

a  (2016-11-23, 2016-11-24)
b   2016-11-18
I tried with df.groupBy("client_id").But I don't know how to find distinct 
values after groupBy().
How to do this?
Is any other efficient methods are available for doing this ?
I am using scala 2.11.8 & spark 2.0

Thanks


RE: filter RDD by variable

2016-12-08 Thread Mendelson, Assaf
Can you provide the sample code you are using?
In general, RDD filter receives as an input a function. The function’s input is 
the single record in the RDD and the output is a Boolean whether or not to 
include it in the result. So you can create any function you want…
Assaf.

From: Soheila S. [mailto:soheila...@gmail.com]
Sent: Wednesday, December 07, 2016 6:23 PM
To: user@spark.apache.org
Subject: filter RDD by variable

Hi
I am new in Spark and have a question in first steps of Spark learning.

How can I filter an RDD using an String variable (for example words[i]) , 
instead of a fix one like "Error"?

Thanks a lot in advance.
Soheila


doing streaming efficiently

2016-12-06 Thread Mendelson, Assaf
Hi,
I have a system which does streaming doing analysis over a long period of time. 
For example a sliding window of 24 hours every 15 minutes.
I have a batch process I need to convert to this streaming.
I am wondering how to do so efficiently.

I am currently building the streaming process so I can use DStream or create 
dataframe for each time period manually.
I know that if I have a groupby, spark would cache the groupby and therefore 
only the new time period would be calculated.
My problem, however, is handling window functions.

Consider an example where I have a window function that counts the number of 
failed logins before a successful one in the last 2 hours. How would I convert 
it to streaming so it wouldn't be recalculated every 15 minutes from scratch?

Thanks,
Assaf.




RE: Writing DataFrame filter results to separate files

2016-12-05 Thread Mendelson, Assaf
If you write to parquet you can use the partitionBy option which would write 
under a directory for each value of the column (assuming you have a column with 
the month).

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Tuesday, December 06, 2016 3:33 AM
To: Everett Anderson
Cc: user
Subject: Re: Writing DataFrame filter results to separate files

1. In my case, I'd need to first explode my data by ~12x to assign each record 
to multiple 12-month rolling output windows. I'm not sure Spark SQL would be 
able to optimize this away, combining it with the output writing to do it 
incrementally.

You are right, but I wouldn't worry about the RAM use.  If implemented properly 
(or if you just use the builtin 
window
 function), it should all be pipelined.

2. Wouldn't each partition -- window in my case -- be shuffled to a single 
machine and then written together as one output shard? For a large amount of 
data per window, that seems less than ideal.

Oh sorry, I thought you wanted one file per value.  If you drop the repartition 
then it won't shuffle, but will just write in parallel on each machine.


RE: Creating schema from json representation

2016-12-04 Thread Mendelson, Assaf
Answering my own question (for those who are interested):

val schema = df.schema
val jsonString = schema.json
val backToSchema = DataType.fromJson(jsonString).asInstanceOf[StructType]




From: Mendelson, Assaf [mailto:assaf.mendel...@rsa.com]
Sent: Sunday, December 04, 2016 11:11 AM
To: user
Subject: Creating schema from json representation

Hi,

I am trying to save a spark dataframe schema in scala.
I can do df.schema.json to get the json string representation.
Now I want to get the schema back from the json.
However, it seems I need to parse the json string myself, get its fields object 
and generate the fields manually.
Is there a better way?
Assaf.


Creating schema from json representation

2016-12-04 Thread Mendelson, Assaf
Hi,

I am trying to save a spark dataframe schema in scala.
I can do df.schema.json to get the json string representation.
Now I want to get the schema back from the json.
However, it seems I need to parse the json string myself, get its fields object 
and generate the fields manually.
Is there a better way?
Assaf.


how does create dataframe from scala collection handle executor failure?

2016-11-22 Thread Mendelson, Assaf
Lets say I have loop that reads some data from somewhere, stores it in a 
collection and creates a dataframe from it. Then an executor containing part of 
the dataframe dies. How does spark handle it?

For example:
val dfSeq = for {
  I <- 0 to 1000
 V <- 0 to 100
 } yield sc.parallelize(V).toDF

Then I would do something with the dataframes (e.g. union them and do some 
calculation).

What would happen if an executor, holding one of the partitions for one of the 
dataframes crashes?
Does this mean I would lose the data? Or would spark save the original data to 
recreate it? If it saves the original data, where would it save it (the whole 
data could be very large, larger than driver memory).

If it loses the data, is there a way to give it a function or something to 
recreate it (e.g. V is read from somewhere and I can reread it if I just know 
what to read).

Thanks,
Assaf.



RE: DataFrame select non-existing column

2016-11-20 Thread Mendelson, Assaf
The nested columns are in fact a syntactic sugar.
You basically have a column called pass. The type of this column is a struct 
which has a field called mobile.
After you read the parquet file you can check the schema (df.schema) and 
looking at what it has. Basically loop through the types and see if you have a 
pass column. Then take the type of that and check if it has a mobile element. 
This would be something like this (writing it without testing so it probably 
will have small mistakes):

df_schema = df.schema
found_pass = False
found_mobile = False
for f in df_schema:
if f.name == 'pass':
found_pass = True
for g in f.dataType:
 if g.name == 'mobile':
 found_mobile = True
break

Assaf
-Original Message-
From: Kristoffer Sjögren [mailto:sto...@gmail.com] 
Sent: Sunday, November 20, 2016 4:13 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: DataFrame select non-existing column

The problem is that I do not know which data frames has the pass.mobile column. 
I just list a HDFS directory which contain the parquet files and some files has 
the column and some don't. I really don't want to have conditional logic that 
inspect the schema. But maybe that's the only option?

Maybe I misunderstand you, but the following code fails with the same error as 
before.

DataFrame dataFrame = ctx.read().parquet(localPath)
.select("pass")
.withColumn("mobile", col("pass.mobile"));


The flatten option works for my use case. But the problem is that there seems 
to be no way of dropping nested columns, i.e.
drop("pass.auction")


On Sun, Nov 20, 2016 at 10:55 AM, Mendelson, Assaf <assaf.mendel...@rsa.com> 
wrote:
> The issue is that you already have a struct called pass. What you did was add 
> a new columned called "pass.mobile" instead of adding the element to pass - 
> The schema for pass element is the same as before.
> When you do select pass.mobile, it finds the pass structure and checks for 
> mobile in it.
>
> You can do it the other way around: set the name to be: pass_mobile. Add it 
> as before with lit(0) for those that dataframes that do not have the mobile 
> field and do something like withColumn("pass_mobile", df["pass.modile"]) for 
> those that do.
> Another option is to use do something like df.select("pass.*") to flatten the 
> pass structure and work on that (then you can do withColumn("mobile",...) 
> instead of "pass.mobile") but this would change the schema.
>
>
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Saturday, November 19, 2016 4:57 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: DataFrame select non-existing column
>
> Thanks. Here's my code example [1] and the printSchema() output [2].
>
> This code still fails with the following message: "No such struct field 
> mobile in auction, geo"
>
> By looking at the schema, it seems that pass.mobile did not get nested, which 
> is the way it needs to be for my use case. Is nested columns not supported by 
> withColumn()?
>
> [1]
>
> DataFrame df = ctx.read().parquet(localPath).withColumn("pass.mobile", 
> lit(0L)); dataFrame.printSchema(); dataFrame.select("pass.mobile");
>
> [2]
>
> root
>  |-- pass: struct (nullable = true)
>  ||-- auction: struct (nullable = true)
>  |||-- id: integer (nullable = true)
>  ||-- geo: struct (nullable = true)
>  |||-- postalCode: string (nullable = true)
>  |-- pass.mobile: long (nullable = false)
>
> On Sat, Nov 19, 2016 at 7:45 AM, Mendelson, Assaf <assaf.mendel...@rsa.com> 
> wrote:
>> In pyspark for example you would do something like:
>>
>> df.withColumn("newColName",pyspark.sql.functions.lit(None))
>>
>> Assaf.
>> -Original Message-
>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>> Sent: Friday, November 18, 2016 9:19 PM
>> To: Mendelson, Assaf
>> Cc: user
>> Subject: Re: DataFrame select non-existing column
>>
>> Thanks for your answer. I have been searching the API for doing that but I 
>> could not find how to do it?
>>
>> Could you give me a code snippet?
>>
>> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf <assaf.mendel...@rsa.com> 
>> wrote:
>>> You can always add the columns to old dataframes giving them null (or some 
>>> literal) as a preprocessing.
>>>
>>> -Original Message-
>>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>>> Sent: Friday, November 18, 2016 4:32 PM
>>> To: user
>>> Subject: DataFrame select non-existing column
>>>
>>>

RE: DataFrame select non-existing column

2016-11-20 Thread Mendelson, Assaf
The issue is that you already have a struct called pass. What you did was add a 
new columned called "pass.mobile" instead of adding the element to pass - The 
schema for pass element is the same as before.
When you do select pass.mobile, it finds the pass structure and checks for 
mobile in it.

You can do it the other way around: set the name to be: pass_mobile. Add it as 
before with lit(0) for those that dataframes that do not have the mobile field 
and do something like withColumn("pass_mobile", df["pass.modile"]) for those 
that do.
Another option is to use do something like df.select("pass.*") to flatten the 
pass structure and work on that (then you can do withColumn("mobile",...) 
instead of "pass.mobile") but this would change the schema.


-Original Message-
From: Kristoffer Sjögren [mailto:sto...@gmail.com] 
Sent: Saturday, November 19, 2016 4:57 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: DataFrame select non-existing column

Thanks. Here's my code example [1] and the printSchema() output [2].

This code still fails with the following message: "No such struct field mobile 
in auction, geo"

By looking at the schema, it seems that pass.mobile did not get nested, which 
is the way it needs to be for my use case. Is nested columns not supported by 
withColumn()?

[1]

DataFrame df = ctx.read().parquet(localPath).withColumn("pass.mobile", 
lit(0L)); dataFrame.printSchema(); dataFrame.select("pass.mobile");

[2]

root
 |-- pass: struct (nullable = true)
 ||-- auction: struct (nullable = true)
 |||-- id: integer (nullable = true)
 ||-- geo: struct (nullable = true)
 |||-- postalCode: string (nullable = true)
 |-- pass.mobile: long (nullable = false)

On Sat, Nov 19, 2016 at 7:45 AM, Mendelson, Assaf <assaf.mendel...@rsa.com> 
wrote:
> In pyspark for example you would do something like:
>
> df.withColumn("newColName",pyspark.sql.functions.lit(None))
>
> Assaf.
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Friday, November 18, 2016 9:19 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: DataFrame select non-existing column
>
> Thanks for your answer. I have been searching the API for doing that but I 
> could not find how to do it?
>
> Could you give me a code snippet?
>
> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf <assaf.mendel...@rsa.com> 
> wrote:
>> You can always add the columns to old dataframes giving them null (or some 
>> literal) as a preprocessing.
>>
>> -Original Message-
>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>> Sent: Friday, November 18, 2016 4:32 PM
>> To: user
>> Subject: DataFrame select non-existing column
>>
>> Hi
>>
>> We have evolved a DataFrame by adding a few columns but cannot write select 
>> statements on these columns for older data that doesn't have them since they 
>> fail with a AnalysisException with message "No such struct field".
>>
>> We also tried dropping columns but this doesn't work for nested columns.
>>
>> Any non-hacky ways to get around this?
>>
>> Cheers,
>> -Kristoffer
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


RE: DataFrame select non-existing column

2016-11-18 Thread Mendelson, Assaf
In pyspark for example you would do something like:

df.withColumn("newColName",pyspark.sql.functions.lit(None))

Assaf.
-Original Message-
From: Kristoffer Sjögren [mailto:sto...@gmail.com] 
Sent: Friday, November 18, 2016 9:19 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: DataFrame select non-existing column

Thanks for your answer. I have been searching the API for doing that but I 
could not find how to do it?

Could you give me a code snippet?

On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf <assaf.mendel...@rsa.com> 
wrote:
> You can always add the columns to old dataframes giving them null (or some 
> literal) as a preprocessing.
>
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Friday, November 18, 2016 4:32 PM
> To: user
> Subject: DataFrame select non-existing column
>
> Hi
>
> We have evolved a DataFrame by adding a few columns but cannot write select 
> statements on these columns for older data that doesn't have them since they 
> fail with a AnalysisException with message "No such struct field".
>
> We also tried dropping columns but this doesn't work for nested columns.
>
> Any non-hacky ways to get around this?
>
> Cheers,
> -Kristoffer
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


RE: DataFrame select non-existing column

2016-11-18 Thread Mendelson, Assaf
You can always add the columns to old dataframes giving them null (or some 
literal) as a preprocessing.

-Original Message-
From: Kristoffer Sjögren [mailto:sto...@gmail.com] 
Sent: Friday, November 18, 2016 4:32 PM
To: user
Subject: DataFrame select non-existing column

Hi

We have evolved a DataFrame by adding a few columns but cannot write select 
statements on these columns for older data that doesn't have them since they 
fail with a AnalysisException with message "No such struct field".

We also tried dropping columns but this doesn't work for nested columns.

Any non-hacky ways to get around this?

Cheers,
-Kristoffer

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Nested UDFs

2016-11-17 Thread Mendelson, Assaf
The you probably want to create a normal function as opposed to UDF.
A UDF takes your function and applies it on each element in the column one 
after the other. You can think of it as working on the result of a loop 
iterating on the column.

pyspark.sql.function.regexp_replace receives a column and applies the regex on 
each element to create a new column.
You can do it in one of two ways:
The first is using a udf in which case you shouldn’t use the 
pyspark.sql.functions.regex but instead use standard python regex.
The second is to simply apply the column changes one after the other in a 
function. This should be something like:
def my_f(target_col):
for match,repl in regexp_list:
target_col = regexp_replace(target_col, match, 
repl)
return target_col

and then use it with:
  Test_data.select(my_f(test_data.name))

The second option is more correct and should provide better performance.

From: Perttu Ranta-aho [mailto:ranta...@iki.fi]
Sent: Thursday, November 17, 2016 1:50 PM
To: user@spark.apache.org
Subject: Re: Nested UDFs

Hi,

My example was little bogus, my real use case is to do multiple regexp 
replacements so something like:

def my_f(data):
for match, repl in regexp_list:
   data = regexp_replace(match, repl, data)
return data

I could achieve my goal by mutiple .select(regexp_replace()) lines, but one UDF 
would be nicer.

-Perttu

to 17. marraskuuta 2016 klo 9.42 Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> kirjoitti:
Regexp_replace is supposed to receive a column, you don’t need to write a UDF 
for it.
Instead try:
Test_data.select(regexp_Replace(test_data.name<http://test_data.name>, ‘a’, ‘X’)

You would need a Udf if you would wanted to do something on the string value of 
a single row (e.g. return data + “bla”)

Assaf.

From: Perttu Ranta-aho [mailto:ranta...@iki.fi<mailto:ranta...@iki.fi>]
Sent: Thursday, November 17, 2016 9:15 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Nested UDFs

Hi,

Shouldn't this work?

from pyspark.sql.functions import regexp_replace, udf

def my_f(data):
return regexp_replace(data, 'a', 'X')
my_udf = udf(my_f)

test_data = sqlContext.createDataFrame([('a',), ('b',), ('c',)], ('name',))
test_data.select(my_udf(test_data.name<http://test_data.name>)).show()

But instead of 'a' being replaced with 'X' I get exception:
  File 
".../spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py",
 line 1471, in regexp_replace
jc = sc._jvm.functions.regexp_replace(_to_java_column(str), pattern, 
replacement)
AttributeError: 'NoneType' object has no attribute '_jvm'

???

-Perttu



RE: How does predicate push down really help?

2016-11-16 Thread Mendelson, Assaf
In the first example, you define the table to be table users from some SQL 
server. Then you perform a filter.
Without predicate pushdown (or any optimization) basically spark understand 
this as follows:
“grab the data from the source described” (which in this case means get all of 
the table from the external sql server to spark memory)
“do the operations I asked for” (in this case filtering).
What predicate pushdown means in this case is that since spark knows an 
external SQL server can actually understand and benefit from the filter command 
it can actually send the filter as part of the query and then once the data 
arrives in spark, it is already filtered.

In the second example we have two tables A and B. What you ask in the command 
is:
“Read A”
“Read B”
“Perform the join” (which is a heavy operation)
“Perform the filtering on the result”

What predicate pushdown would do instead is translate it to:
“Read A”
“Perform filtering on A”
“Read B”
“Perform filtering on B”
“perform the join on the filtered A and B”
Now the join is being made on smaller data (after the filtering) and therefore 
takes less time. The heuristic is that in most cases the time saved on the join 
would be much more than any extra time taken by the filter itself.

BTW. You can see the differences between the original plan and the optimized 
plan by calling explain(true) on the dataframe.  This would show you what was 
parsed, how the optimization worked and what was physically run.

Assaf.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, November 17, 2016 9:50 AM
To: Mendelson, Assaf
Cc: user @spark
Subject: Re: How does predicate push down really help?

Hi Assaf,

I am still trying to understand the merits of predicate push down from the 
examples you pointed out.

Example 1: Say we don't have a predicate push down feature why does spark needs 
to pull all the rows and filter it in memory? why not simply issue select 
statement with "where" clause to do the filtering via JDBC or something?

Example 2: Same Argument as Example 1 except when we don't have a predicate 
push down feature we could simply do it using JOIN and where operators in the 
SQL statement right.

I feel like I am missing something to understand the merits of predicate push 
down.

Thanks,
kant




On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Actually, both you translate to the same plan.
When you do sql(“some code”) or filter, it doesn’t actually do the query. 
Instead it is translated to a plan (parsed plan) which transform everything 
into standard spark expressions. Then spark analyzes it to fill in the blanks 
(what is users table for example) and attempts to optimize it. Predicate 
pushdown happens in the optimization portion.
For example, let’s say that users would actually be backed by a table on an sql 
query in mysql.
Without predicate pushdown spark would first pull the entire users table from 
mysql and only then do the filtering. Predicate pushdown would mean the 
filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A 
and B which are joined by some common key. Then a filtering is done on the key. 
Moving the filter to be before the join would probably make everything faster 
as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth...@gmail.com<mailto:kanth...@gmail.com>]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

 vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")





RE: Nested UDFs

2016-11-16 Thread Mendelson, Assaf
Regexp_replace is supposed to receive a column, you don’t need to write a UDF 
for it.
Instead try:
Test_data.select(regexp_Replace(test_data.name, ‘a’, ‘X’)

You would need a Udf if you would wanted to do something on the string value of 
a single row (e.g. return data + “bla”)

Assaf.

From: Perttu Ranta-aho [mailto:ranta...@iki.fi]
Sent: Thursday, November 17, 2016 9:15 AM
To: user@spark.apache.org
Subject: Nested UDFs

Hi,

Shouldn't this work?

from pyspark.sql.functions import regexp_replace, udf

def my_f(data):
return regexp_replace(data, 'a', 'X')
my_udf = udf(my_f)

test_data = sqlContext.createDataFrame([('a',), ('b',), ('c',)], ('name',))
test_data.select(my_udf(test_data.name)).show()

But instead of 'a' being replaced with 'X' I get exception:
  File 
".../spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py",
 line 1471, in regexp_replace
jc = sc._jvm.functions.regexp_replace(_to_java_column(str), pattern, 
replacement)
AttributeError: 'NoneType' object has no attribute '_jvm'

???

-Perttu



RE: How does predicate push down really help?

2016-11-16 Thread Mendelson, Assaf
Actually, both you translate to the same plan.
When you do sql(“some code”) or filter, it doesn’t actually do the query. 
Instead it is translated to a plan (parsed plan) which transform everything 
into standard spark expressions. Then spark analyzes it to fill in the blanks 
(what is users table for example) and attempts to optimize it. Predicate 
pushdown happens in the optimization portion.
For example, let’s say that users would actually be backed by a table on an sql 
query in mysql.
Without predicate pushdown spark would first pull the entire users table from 
mysql and only then do the filtering. Predicate pushdown would mean the 
filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A 
and B which are joined by some common key. Then a filtering is done on the key. 
Moving the filter to be before the join would probably make everything faster 
as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

 vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")




RE: Re:RE: how to merge dataframe write output files

2016-11-10 Thread Mendelson, Assaf
As people stated, when you coalesce to 1 partition then basically you lose all 
parallelism, however, you can coalesce to a difference value.
If for example you coalesce to 20 then you can parallelize up to 20 different 
tasks.
You have a total of 4 executors, with 2 cores each. This means that you 
basically have a core parallelism of 8. In general it is best to have a number 
of tasks which is 2-3 times that number for better distribution. So in general 
~20 tasks would be a good idea. Looking at your output I see part 00176 which I 
guess would mean you have an order of 200 tasks (which is the default 
parallelism when you have a shuffle for example).
Coalescing to 20 would still give you enough parallelism to use your cluster 
and would give you less files which are bigger.
Assaf.


From: Shreya Agarwal [mailto:shrey...@microsoft.com]
Sent: Thursday, November 10, 2016 10:28 AM
To: lk_spark
Cc: user.spark
Subject: RE: Re:RE: how to merge dataframe write output files

Your coalesce should technically work - One thing to check would be overhead 
memory. You should configure it as 10% of executor memory.  Also, you might 
need to increase maxResultSize. Also, the data looks fine for the cluster 
unless your join yields >6G worth of data. Few things to try -

  1.  Can you do a cache on both the DFs and try?
  2.  Can you do a cache on both, then use scala join on DFs instead of loading 
to sql ?
  3.  Can you try dataset instead of dataframe? (assuming you are on Spark 2.0)
  4.  If you still see errors, can you check YARN logs, or post here?

I am sorry I don't know the answer to this,  but pretty sure there should be a 
way to work with fragmented files too.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Thursday, November 10, 2016 12:20 AM
To: Shreya Agarwal >
Cc: user.spark >
Subject: Re:RE: how to merge dataframe write output files

thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file .
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?

the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }



At 2016-11-10 15:47:02, "Shreya Agarwal" 
> wrote:
Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

Not to mention that it'll make your write incredibly slow and also it'll take 
away all the speed of reading in the data from a parquet as there won't be any 
parallelism at the time of input (if you try to input this parquet).

Again, the important question is - Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it'll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark >
Subject: how to merge dataframe write output files

hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 

RE: Quirk in how Spark DF handles JSON input records?

2016-11-03 Thread Mendelson, Assaf
I agree this can be a little annoying. The reason this is done this way is to 
enable cases where the json file is huge. To allow splitting it, a separator is 
needed and newline is the separator used (as is done in all text files in 
Hadoop and spark).
I always wondered why support has not been implemented for cases where each 
file is small (e.g. has one object) but the implementation now assume each line 
has a legal json object.

Why I do to overcome this is use RDDs (using pyspark):

// get an RDD of the text context. The map is used because wholeTextFiles 
returns a tuple of filename, file content
jsonRDD = sc.wholeTextFiles(filename).map(lambda x: x[1])

// remove whitespaces. This can actually be too much as it would also work 
inside string info so you can maybe remove just the end line characters (e.g. 
\r, \n)
import re
js = jsonRDD.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))

// convert the rdd to dataframe. If you have your own schema, this is where you 
should add it.
df = spark.read.json(js)

Assaf.

From: Michael Segel [mailto:msegel_had...@hotmail.com]
Sent: Wednesday, November 02, 2016 9:39 PM
To: Daniel Siegmann
Cc: user @spark
Subject: Re: Quirk in how Spark DF handles JSON input records?


On Nov 2, 2016, at 2:22 PM, Daniel Siegmann 
> wrote:

Yes, it needs to be on a single line. Spark (or Hadoop really) treats newlines 
as a record separator by default. While it is possible to use a different 
string as a record separator, what would you use in the case of JSON?
If you do some Googling I suspect you'll find some possible solutions. 
Personally, I would just use a separate JSON library (e.g. json4s) to parse 
this metadata into an object, rather than trying to read it in through Spark.


Yeah, that’s the basic idea.

This JSON is metadata to help drive the process not row records… although the 
column descriptors are row records so in the short term I could cheat and just 
store those in a file.

:-(


--
Daniel Siegmann
Senior Software Engineer
SecurityScorecard Inc.
214 W 29th Street, 5th Floor
New York, NY 10001



RE: Use a specific partition of dataframe

2016-11-03 Thread Mendelson, Assaf
There are a couple of tools you can use. Take a look at the various functions.
Specifically, limit might be useful for you and sample/sampleBy functions can 
make your data smaller.
Actually, when using CreateDataframe you can sample the data to begin with.

Specifically working by partitions can be done by moving through the RDD 
interface but I am not sure this is what you want. Actually working through a 
specific partition might mean seeing skewed data because of the hashing method 
used to partition (this would of course depend on how your dataframe was 
created).

Just to get smaller data sample/sampleBy seems like the best solution to me.

Assaf.

From: Yanwei Zhang [mailto:actuary_zh...@hotmail.com]
Sent: Wednesday, November 02, 2016 6:29 PM
To: user
Subject: Use a specific partition of dataframe

Is it possible to retrieve a specific partition  (e.g., the first partition) of 
a DataFrame and apply some function there? My data is too large, and I just 
want to get some approximate measures using the first few partitions in the 
data. I'll illustrate what I want to accomplish using the example below:

// create date
val tmp = sc.parallelize(Seq( ("a", 1), ("b", 2), ("a", 1),
  ("b", 2), ("a", 1), ("b", 2)), 2).toDF("var", 
"value")
// I want to get the first partition only, and do some calculation, for 
example, count by the value of "var"
tmp1 = tmp.getPartition(0)
tmp1.groupBy("var").count()

The idea is not to go through all the data to save computational time. So I am 
not sure whether mapPartitionsWithIndex is helpful in this case, since it still 
maps all data.

Regards,
Wayne




RE: Spark security

2016-10-27 Thread Mendelson, Assaf
Anyone can assist with this?

From: Mendelson, Assaf [mailto:assaf.mendel...@rsa.com]
Sent: Thursday, October 13, 2016 3:41 PM
To: user@spark.apache.org
Subject: Spark security

Hi,
We have a spark cluster and we wanted to add some security for it. I was 
looking at the documentation (in  
http://spark.apache.org/docs/latest/security.html) and had some questions.

1.   Do all executors listen by the same blockManager port? For example, in 
yarn there are multiple executors per node, do they all listen to the same port?

2.   Are ports defined in earlier version (e.g. 
http://spark.apache.org/docs/1.6.1/security.html) and removed in the latest 
(such as spark.executor.port and spark.fileserver.port) gone and can be blocked?

3.   If I define multiple workers per node in spark standalone mode, how do 
I set the different ports for each worker (there is only one 
spark.worker.ui.port / SPARK_WORKER_WEBUI_PORT definition. Do I have to start 
each worker separately to configure a port?) The same is true for the worker 
port (SPARK_WORKER_PORT)

4.   Is it possible to encrypt the logs instead of just limiting with 
permissions the log directory?

5.   Is the communication between the servers encrypted (e.g. using ssh?)

6.   Are there any additional best practices beyond what is written in the 
documentation?
Thanks,
Assaf.




RE: Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Mendelson, Assaf
Hi,
I believe that the UDF is only a small part of the problem. You can easily test 
by doing a UDF for dataframe too.
In my testing I saw that using datasets can be considerably slower than 
dataframe. I can make a guess as to why this happens.
Basically what you are doing in a dataframe is reading the data, then doing a 
function on a single column and counting the results. What this means is that 
in practice, the data is read into the tungsten project unsafe data structure, 
then the single column event_type is analyzed for the filtering and then the 
result is counted.
On the otherhand, dataset would read each element and convert it to a case 
class before doing the calculation. This means two things: First we need to 
read all the columns in the case class and second we need to generate the case 
class itself.
So basically the dataset option reads a lot more (all columns defined in the 
case class) and copies them (in the generation of the case class object used 
for the filtering).
In a more general case (i.e. when more complicated behavior is needed), we 
would be losing even more in terms of performance as catalyst and codegen would 
not take effect. Try for example to do the filter on a numeric value and you 
would see an even bigger difference as predicate pushdown to parquet would 
lower the dataframe’s time and not change much in the dataset.

If your goal is pure performance then probably dataframe solutions would be 
better than dataset in most cases. Dataset provides the advantage of type 
safety and if you have very complex logic where you would need to do multiple 
UDFs with many columns then going directly to dataset would simplify the 
development.



From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
Sent: Monday, October 24, 2016 7:52 PM
To: Antoaneta Marinova
Cc: user
Subject: Re: Spark 2.0 - DataFrames vs Dataset performance

Hi Antoaneta,
I believe the difference is not due to Datasets being slower (DataFrames are 
just an alias to Datasets now), but rather using a user defined function for 
filtering vs using Spark builtins. The builtin can use tricks from Project 
Tungsten, such as only deserializing the "event_type" column. The user-defined 
function on the other hand has to be called with a full case class, so the 
whole object needs to be deserialized for each row.

Well, that is my understanding from reading some slides. Hopefully someone with 
a more direct knowledge of the code will correct me if I'm wrong.

On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova 
> wrote:

Hello,


I am using Spark 2.0 for performing filtering, grouping and counting operations 
on events data saved in parquet files. As the events schema has very nested 
structure I wanted to read them as scala beans to simplify the code but I 
noticed a severe performance degradation. Below you can find simple comparison 
of the same operation using DataFrame and Dataset.


val data = session.read.parquet("events_data/")


// Using Datasets with custom class


//Case class matching the events schema

case class CustomEvent(event_id: Option[String],

event_type: Option[String]
   context : Option[Context],

….
   time: Option[BigInt]) extends Serializable {}

scala> val start = System.currentTimeMillis ;

  val count = data.as[CustomEvent].filter(event => 
eventNames.contains(event.event_type.get)).count ;

 val time =  System.currentTimeMillis - start


count: Long = 5545

time: Long = 11262


// Using DataFrames


scala>

val start = System.currentTimeMillis ;

val count = data.filter(col("event_type").isin(eventNames:_*)).count ;

val time =  System.currentTimeMillis - start


count: Long = 5545

time: Long = 147


The schema of the events is something like this:


//events schma

schemaroot

|-- event_id: string (nullable = true)

|-- event_type: string (nullable = true)

|-- context: struct (nullable = true)

||-- environment_1: struct (nullable = true)

|||-- filed1: integer (nullable = true)

|||-- filed2: integer (nullable = true)

|||-- filed3: integer (nullable = true)

||-- environment_2: struct (nullable = true)

|||-- filed_1: string (nullable = true)



|||-- filed_n: string (nullable = true)

|-- metadata: struct (nullable = true)

||-- elements: array (nullable = true)

|||-- element: struct (containsNull = true)

||||-- config: string (nullable = true)

||||-- tree: array (nullable = true)

|||||-- element: struct (containsNull = true)

||||||-- path: array (nullable = true)

|||||||-- element: struct (containsNull = true)

||||||||-- key: string (nullable = true)

||||||||-- name: string (nullable = true)

||||||||-- level: 

RE: pyspark dataframe codes for lead lag to column

2016-10-20 Thread Mendelson, Assaf
Depending on your usecase, you may want to take a look at window functions

From: muhammet pakyürek [mailto:mpa...@hotmail.com]
Sent: Thursday, October 20, 2016 11:36 AM
To: user@spark.apache.org
Subject: pyspark dataframe codes for lead lag to column





is there pyspark dataframe codes for lead lag to column?

lead/lag column is something

1  lag   -1lead 2
213
324
435
54   -1


RE: Aggregate UDF (UDAF) in Python

2016-10-18 Thread Mendelson, Assaf
A simple example:

We have a scala file:


package com.myorg.example

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
import org.apache.spark.sql.functions.{rand, sum}
import org.apache.spark.sql.types.{DataType, DoubleType, StructField, 
StructType}

class PerformSumUDAF() extends UserDefinedAggregateFunction {

  def inputSchema: StructType = StructType(Array(StructField("item", 
DoubleType)))

  def bufferSchema: StructType = StructType(Array(StructField("sum", 
DoubleType)))

  def dataType: DataType = DoubleType

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.toDouble
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getDouble(0) + input.getDouble(0)
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
  }

  def evaluate(buffer: Row): Any = {
buffer.getDouble(0)
  }
}


We place the file under 
myroot/src/main/scala/com/myorg/example/ExampleUDAF.scala
Under myroot we create a pom file (sorry for not cleaning it up, it includes 
some stuff you probably not need like guava and avro)


  edu.berkeley
  simple-project
  4.0.0
  example packages
  jar
  1.0
  
UTF-8
1.8
1.8
  
  
  
  com.google.guava
  guava
  19.0
  
 
  org.apache.spark
  spark-core_2.11
  2.0.0
  provided


  org.postgresql
  postgresql
  9.4.1208


  com.databricks
  spark-avro_2.11
  3.0.0-preview2


  org.apache.spark
  spark-sql_2.11
  2.0.0
  provided


org.scala-lang
scala-library
2.11.8
  provided


  

  
  
  org.apache.maven.plugins
  maven-shade-plugin
  2.4.3
  
  
  package
  
  shade
  
  
  

  com.google.common
  
com.myorg.shaded.com.google.common

  
  
simple-project-1.0-jar-with-dependencies
  
  
  
  
  
org.scala-tools
maven-scala-plugin
2.15.2

  

  compile

  

  





Now you can compile the scala like so: mvn clean install (I assume you have 
maven installed).

Now we want to call this from python (assuming spark is your spark session):
# get a reference dataframe to do the example on:
df = spark.range(20)

# get the jvm pointer
jvm = spark.sparkContext._gateway.jvm
# import the class
from py4j.java_gateway import java_import
java_import(jvm, "com.myorg.example.PerformSumUDAF")

#create an object from the class:
udafObj = jvm.com.myorg.example.PerformSumUDAF()
# define a python function to do the aggregation.
from pyspark.sql.column import Column, _to_java_column, _to_seq
def pythonudaf(c):
# the _to_seq portion is because we need to convert this to a sequence of
# input columns the way scala (java) expects them. The returned
# value must then be converted to a pyspark Column
return Column(udafObj.apply(_to_seq(spark.sparkContext, [c], 
_to_java_column)))

# now lets use the function
df.agg(pythonudaf(df.id)).show()

Lastly when you run, make sure to use both –jars and --driver-class-path with 
the jar created from scala to make sure it is available in all nodes.



From: Tobi Bosede [mailto:ani.to...@gmail.com]
Sent: Monday, October 17, 2016 10:15 PM
To: Mendelson, Assaf
Cc: Holden Karau; user
Subject: Re: Aggregate UDF (UDAF) in Python

Thanks Assaf. Yes please provide an example of how to wrap code for python. I 
am leaning towards scala.

On Mon, Oct 17, 2016 at 1:50 PM, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
A possible (bad) workaround would be to use the collect_list function. This 
will give you all the values in an array (list) and you can then create a UDF 
to do the aggregation yourself. This would be very slow and cost a lot of 
memory but it would work if your cluster can handle it.
This is the only workaround I can think of, otherwise you  will need to write 
the UDAF in java/scala and wrap it for python use. If you need an example on 
how to do so I can provide one.
Assaf.

From: Tobi Bosede [mailto:ani.to...@gmail.com<mailto:ani.to...@gmail.com>]
Sent: Sunday, October 16, 2016 7:49 PM
To: Holden Karau
Cc: user
Subject: Re: Aggregate UDF (UDAF) in Python

OK, I misread the year on the dev list. Can you comment on work arounds? (I.e. 
question about if scala/java are the only option.)

RE: Aggregate UDF (UDAF) in Python

2016-10-17 Thread Mendelson, Assaf
A possible (bad) workaround would be to use the collect_list function. This 
will give you all the values in an array (list) and you can then create a UDF 
to do the aggregation yourself. This would be very slow and cost a lot of 
memory but it would work if your cluster can handle it.
This is the only workaround I can think of, otherwise you  will need to write 
the UDAF in java/scala and wrap it for python use. If you need an example on 
how to do so I can provide one.
Assaf.

From: Tobi Bosede [mailto:ani.to...@gmail.com]
Sent: Sunday, October 16, 2016 7:49 PM
To: Holden Karau
Cc: user
Subject: Re: Aggregate UDF (UDAF) in Python

OK, I misread the year on the dev list. Can you comment on work arounds? (I.e. 
question about if scala/java are the only option.)

On Sun, Oct 16, 2016 at 12:09 PM, Holden Karau 
> wrote:
The comment on the developer list is from earlier this week. I'm not sure why 
UDAF support hasn't made the hop to Python - while I work a fair amount on 
PySpark it's mostly in core & ML and not a lot with SQL so there could be good 
reasons I'm just not familiar with. We can try pinging Davies or Michael on the 
JIRA to see what their thoughts are.

On Sunday, October 16, 2016, Tobi Bosede 
> wrote:
Thanks for the info Holden.

So it seems both the jira and the comment on the developer list are over a year 
old. More surprising, the jira has no assignee. Any particular reason for the 
lack of activity in this area?

Is writing scala/java the only work around for this? I hear a lot of people say 
python is the gateway language to scala. It is because of issues like this that 
people use scala for Spark rather than python or eventually abandon python for 
scala. It just takes too long for features to get ported over from scala/java.


On Sun, Oct 16, 2016 at 8:42 AM, Holden Karau 
> wrote:
I don't believe UDAFs are available in PySpark as this came up on the developer 
list while I was asking for what features people were missing in PySpark - see 
http://apache-spark-developers-list.1001551.n3.nabble.com/Python-Spark-Improvements-forked-from-Spark-Improvement-Proposals-td19422.html
 . The JIRA for tacking this issue is at 
https://issues.apache.org/jira/browse/SPARK-10915

On Sat, Oct 15, 2016 at 7:20 PM, Tobi Bosede 
> wrote:
Hello,

I am trying to use a UDF that calculates inter-quartile (IQR) range for pivot() 
and SQL in pyspark and got the error that my function wasn't an aggregate 
function in both scenarios. Does anyone know if UDAF functionality is available 
in python? If not, what can I do as a work around?

Thanks,
Tobi



--
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau



--
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau




Spark security

2016-10-13 Thread Mendelson, Assaf
Hi,
We have a spark cluster and we wanted to add some security for it. I was 
looking at the documentation (in  
http://spark.apache.org/docs/latest/security.html) and had some questions.

1.   Do all executors listen by the same blockManager port? For example, in 
yarn there are multiple executors per node, do they all listen to the same port?

2.   Are ports defined in earlier version (e.g. 
http://spark.apache.org/docs/1.6.1/security.html) and removed in the latest 
(such as spark.executor.port and spark.fileserver.port) gone and can be blocked?

3.   If I define multiple workers per node in spark standalone mode, how do 
I set the different ports for each worker (there is only one 
spark.worker.ui.port / SPARK_WORKER_WEBUI_PORT definition. Do I have to start 
each worker separately to configure a port?) The same is true for the worker 
port (SPARK_WORKER_PORT)

4.   Is it possible to encrypt the logs instead of just limiting with 
permissions the log directory?

5.   Is the communication between the servers encrypted (e.g. using ssh?)

6.   Are there any additional best practices beyond what is written in the 
documentation?
Thanks,
Assaf.




RE: udf of aggregation in pyspark dataframe ?

2016-09-30 Thread Mendelson, Assaf
I may be missing something here, but it seems to me you can do it like this:
df.groupBy('a').agg(collect_list('c').alias("a",collect_list('d').alias("b")).withColumn('named_list'),
 my_zip(F.Col("a"), F.Col("b"))
without needing to write a new aggregation function

-Original Message-
From: peng yu [mailto:yupb...@gmail.com] 
Sent: Thursday, September 29, 2016 8:35 PM
To: user@spark.apache.org
Subject: Re: udf of aggregation in pyspark dataframe ?

df:  
-
a|b|c
---
1|m|n
1|x | j
2|m|x
...


import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType

def my_zip(c, d):
return dict(zip(c, d))

my_zip = F.udf(_my_zip, MapType(StingType(), StringType(), True), True)

df.groupBy('a').agg(my_zip(collect_list('c'),
collect_list('d')).alias('named_list'))



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: building runnable distribution from source

2016-09-29 Thread Mendelson, Assaf
Thanks, that solved it.
If there is a developer here, it would be useful if this error would be marked 
as error instead of INFO (especially since this causes core to fail instead of 
an R package).
Thanks,
Assaf.

-Original Message-
From: Ding Fei [mailto:ding...@stars.org.cn] 
Sent: Thursday, September 29, 2016 1:20 PM
To: Mendelson, Assaf
Cc: user@spark.apache.org
Subject: Re: building runnable distribution from source

Check that your R is properly installed:

>Cannot find 'R_HOME'. Please specify 'R_HOME' or make sure R is
properly 
>installed.



On Thu, 2016-09-29 at 01:08 -0700, AssafMendelson wrote:
> Hi,
> 
> I am trying to compile the latest branch of spark in order to try out 
> some code I wanted to contribute.
> 
> 
> I was looking at the instructions to build from 
> http://spark.apache.org/docs/latest/building-spark.html
> 
> So at first I did:
> 
> ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests 
> clean package
> 
> This worked without a problem and compiled.
> 
>  
> 
> I then did
> 
> ./dev/make-distribution.sh --name custom-spark --tgz -e -Psparkr
> -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn
> 
> Which failed.
> 
> (I added the –e because the first run, without it suggested adding 
> this to get more information).
> 
> If I look at the compilation itself, It provides no messages for spark 
> project core:
> 
>  
> 
> [INFO] Building Spark Project Core 2.1.0-SNAPSHOT
> 
> [INFO]
> --
> --
> 
> [INFO]
>  
> 
> [INFO]
> --
> --
> 
> [INFO] Building Spark Project YARN Shuffle Service 2.1.0-SNAPSHOT
> 
> [INFO]
> --
> -
> 
>  
> 
> However, when I reach the summary I find that core has failed to 
> compile.
> 
> Below is the messages from the end of the compilation but I can’t find 
> any direct error.
> 
> I tried to google this but found no solution. Could anyone point me to 
> how to fix this?
> 
>  
> 
>  
> 
> [INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @
> spark-core_2.11 ---
> 
> [INFO] Changes detected - recompiling the module!
> 
> [INFO] Compiling 74 source files
> to /home/mendea3/git/spark/core/target/scala-2.11/classes
> 
> [INFO]
> 
> [INFO] --- exec-maven-plugin:1.4.0:exec (sparkr-pkg) @ spark-core_2.11
> ---
> 
> Cannot find 'R_HOME'. Please specify 'R_HOME' or make sure R is 
> properly installed.
> 
> [INFO]
> --
> --
> 
> [INFO] Reactor Summary:
> 
> [INFO]
> 
> [INFO] Spark Project Parent POM ... SUCCESS [
> 4.165 s]
> 
> [INFO] Spark Project Tags . SUCCESS [
> 5.163 s]
> 
> [INFO] Spark Project Sketch ... SUCCESS [
> 7.393 s]
> 
> [INFO] Spark Project Networking ... SUCCESS [ 
> 18.929 s]
> 
> [INFO] Spark Project Shuffle Streaming Service  SUCCESS [ 
> 10.528 s]
> 
> [INFO] Spark Project Unsafe ... SUCCESS [ 
> 14.453 s]
> 
> [INFO] Spark Project Launcher . SUCCESS [ 
> 15.198 s]
> 
> [INFO] Spark Project Core . FAILURE [ 
> 57.641 s]
> 
> [INFO] Spark Project ML Local Library . SUCCESS [ 
> 10.561 s]
> 
> [INFO] Spark Project GraphX ... SKIPPED
> 
> [INFO] Spark Project Streaming  SKIPPED
> 
> [INFO] Spark Project Catalyst . SKIPPED
> 
> [INFO] Spark Project SQL .. SKIPPED
> 
> [INFO] Spark Project ML Library ... SKIPPED
> 
> [INFO] Spark Project Tools  SUCCESS [
> 4.188 s]
> 
> [INFO] Spark Project Hive . SKIPPED
> 
> [INFO] Spark Project REPL . SKIPPED
> 
> [INFO] Spark Project YARN Shuffle Service . SUCCESS [ 
> 16.128 s]
> 
> [INFO] Spark Project YARN . SKIPPED
> 
> [INFO] Spark Project Hive Thrift Server ... SKIPPED
> 
> [INFO] Spark Project Assembly . SKIPPED
> 
> [INFO] Spark Project External Flume Sink .. SUCCESS [
> 9.855 s]
> 
> [INFO] Spark Project Extern

RE: classpath conflict with spark internal libraries and the spark shell.

2016-09-11 Thread Mendelson, Assaf
You can try shading the jar. Look at maven shade plugin


From: Benyi Wang [mailto:bewang.t...@gmail.com]
Sent: Saturday, September 10, 2016 1:35 AM
To: Colin Kincaid Williams
Cc: user@spark.apache.org
Subject: Re: classpath conflict with spark internal libraries and the spark 
shell.

I had a problem when I used "spark.executor.userClassPathFirst" before. I don't 
remember what the problem is.

Alternatively, you can use --driver-class-path and "--conf 
spark.executor.extraClassPath".  Unfortunately you may feel frustrated like me 
when trying to make it work.

Depends on how you run spark:
- standalone or yarn,
- run as Application or in spark-shell
The configuration will be different. It is hard to say in a short, so I wrote 
two blogs to explain it.
http://ben-tech.blogspot.com/2016/05/how-to-resolve-spark-cassandra.html
http://ben-tech.blogspot.com/2016/04/how-to-resolve-spark-cassandra.html

Hope those blogs help.

If you still have class conflict problem, you can consider to load the external 
library and its dependencies using a special classloader just like spark-hive, 
which can load the specified version of hive jars.

On Fri, Sep 9, 2016 at 2:53 PM, Colin Kincaid Williams 
> wrote:
My bad, gothos on IRC pointed me to the docs:

http://jhz.name/2016/01/10/spark-classpath.html

Thanks Gothos!

On Fri, Sep 9, 2016 at 9:23 PM, Colin Kincaid Williams 
> wrote:
> I'm using the spark shell v1.61 . I have a classpath conflict, where I
> have an external library ( not OSS either :( , can't rebuild it.)
> using httpclient-4.5.2.jar. I use spark-shell --jars
> file:/path/to/httpclient-4.5.2.jar
>
> However spark is using httpclient-4.3 internally. Then when I try to
> use the external library I get
>
> getClass.getResource("/org/apache/http/conn/ssl/SSLConnectionSocketFactory.class");
>
> res5: java.net.URL =
> jar:file:/opt/spark-1.6.1-bin-hadoop2.4/lib/spark-assembly-1.6.1-hadoop2.4.0.jar!/org/apache/http/conn/ssl/SSLConnectionSocketFactory.class
>
> How do I get spark-shell on 1.6.1 to allow me to use the external
> httpclient-4.5.2.jar for my application,and ignore it's internal
> library. Or is this not possible?

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org



RE: Selecting the top 100 records per group by?

2016-09-11 Thread Mendelson, Assaf

You can also create a custom aggregation function. It might provide better 
performance than dense_rank.

Consider the following example to collect everything as list:
class CollectListFunction[T](val colType: DataType) extends 
UserDefinedAggregateFunction {

  def inputSchema: StructType =
new StructType().add("inputCol", colType)

  def bufferSchema: StructType =
new StructType().add("outputCol", ArrayType(colType))

  def dataType: DataType = ArrayType(colType)

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, new mutable.ArrayBuffer[T])
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val list = buffer.getSeq[T](0)
if (!input.isNullAt(0)) {
  val sales = input.getAs[T](0)
  buffer.update(0, list:+sales)
}
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getSeq[T](0) ++ buffer2.getSeq[T](0))
  }

  def evaluate(buffer: Row): Any = {
buffer.getSeq[T](0)
  }
}

All you would need to do is modify it to contain only the top 100…

From: burtonator2...@gmail.com 
[mailto:burtonator2...@gmail.com] On Behalf Of Kevin Burton
Sent: Sunday, September 11, 2016 6:33 AM
To: Karl Higley
Cc: user@spark.apache.org
Subject: Re: Selecting the top 100 records per group by?

Looks like you can do it with dense_rank functions.

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

I setup some basic records and seems like it did the right thing.

Now time to throw 50TB and 100 spark nodes at this problem and see what happens 
:)

On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton 
> wrote:
Ah.. might actually. I'll have to mess around with that.

On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley 
> wrote:
Would `topByKey` help?

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42

Best,
Karl

On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton 
> wrote:
I'm trying to figure out a way to group by and return the top 100 records in 
that group.

Something like:

SELECT TOP(100, user_id) FROM posts GROUP BY user_id;

But I can't really figure out the best way to do this...

There is a FIRST and LAST aggregate function but this only returns one column.

I could do something like:

SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT 100;

But that limit is applied for ALL the records. Not each individual user.

The only other thing I can think of is to do a manual map reduce and then have 
the reducer only return the top 100 each time...

Would LOVE some advice here...

--
We’re hiring if you know of any awesome Java Devops or Linux Operations 
Engineers!

Founder/CEO Spinn3r.com
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ 
profile




--
We’re hiring if you know of any awesome Java Devops or Linux Operations 
Engineers!

Founder/CEO Spinn3r.com
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ 
profile




--
We’re hiring if you know of any awesome Java Devops or Linux Operations 
Engineers!

Founder/CEO Spinn3r.com
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ 
profile



RE: add jars like spark-csv to ipython notebook with pyspakr

2016-09-11 Thread Mendelson, Assaf
In my case I do the following:
export PYSPARK_DRIVER_PYTHON_OPTS="notebook  --no-browser"
pyspark --jars myjar.jar --driver-class-path myjar.jar
hope this helps…

From: pseudo oduesp [mailto:pseudo20...@gmail.com]
Sent: Friday, September 09, 2016 3:55 PM
To: user@spark.apache.org
Subject: add jars like spark-csv to ipython notebook with pyspakr

Hi ,
how i can add jar to Ipython notebooke
i tied Pyspark_submit_args without succes ?
thanks