Re: What could be the cause of an execution freeze on Hadoop for small datasets?

2023-03-11 Thread sam smith
" In this case your program may work because effectively you are not using
the spark in yarn on the hadoop cluster  " I am actually using Yarn as
mentioned (client mode)
I already know that, but it is not just about collectAsList, the execution
freezes also for example when using save() on the dataset (after the
transformations, before them it is ok to perform save() on the dataset).

I hope the question is clearer (for anybody who's reading) now.

Le sam. 11 mars 2023 à 20:15, Mich Talebzadeh  a
écrit :

> collectAsList brings all the data into the driver which is a single JVM
> on a single node. In this case your program may work because effectively
> you are not using the spark in yarn on the hadoop cluster. The benefit of
> Spark is that you can process a large amount of data using the memory and
> processors across multiple executors on multiple nodes.
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 11 Mar 2023 at 19:01, sam smith 
> wrote:
>
>> not sure what you mean by your question, but it is not helping in any case
>>
>>
>> Le sam. 11 mars 2023 à 19:54, Mich Talebzadeh 
>> a écrit :
>>
>>>
>>>
>>> ... To note that if I execute collectAsList on the dataset at the
>>> beginning of the program
>>>
>>> What do you think  collectAsList does?
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 11 Mar 2023 at 18:29, sam smith 
>>> wrote:
>>>
>>>> Hello guys,
>>>>
>>>> I am launching through code (client mode) a Spark program to run in
>>>> Hadoop. If I execute on the dataset methods of the likes of show() and
>>>> count() or collectAsList() (that are displayed in the Spark UI) after
>>>> performing heavy transformations on the columns then the mentioned methods
>>>> will cause the execution to freeze on Hadoop and that independently of the
>>>> dataset size (intriguing issue for small size datasets!).
>>>> Any idea what could be causing this type of issue?
>>>> To note that if I execute collectAsList on the dataset at the beginning
>>>> of the program (before performing the transformations on the columns) then
>>>> the method yields results correctly.
>>>>
>>>> Thanks.
>>>> Regards
>>>>
>>>>


Re: What could be the cause of an execution freeze on Hadoop for small datasets?

2023-03-11 Thread sam smith
not sure what you mean by your question, but it is not helping in any case


Le sam. 11 mars 2023 à 19:54, Mich Talebzadeh  a
écrit :

>
>
> ... To note that if I execute collectAsList on the dataset at the
> beginning of the program
>
> What do you think  collectAsList does?
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 11 Mar 2023 at 18:29, sam smith 
> wrote:
>
>> Hello guys,
>>
>> I am launching through code (client mode) a Spark program to run in
>> Hadoop. If I execute on the dataset methods of the likes of show() and
>> count() or collectAsList() (that are displayed in the Spark UI) after
>> performing heavy transformations on the columns then the mentioned methods
>> will cause the execution to freeze on Hadoop and that independently of the
>> dataset size (intriguing issue for small size datasets!).
>> Any idea what could be causing this type of issue?
>> To note that if I execute collectAsList on the dataset at the beginning
>> of the program (before performing the transformations on the columns) then
>> the method yields results correctly.
>>
>> Thanks.
>> Regards
>>
>>


What could be the cause of an execution freeze on Hadoop for small datasets?

2023-03-11 Thread sam smith
Hello guys,

I am launching through code (client mode) a Spark program to run in Hadoop.
If I execute on the dataset methods of the likes of show() and count() or
collectAsList() (that are displayed in the Spark UI) after performing heavy
transformations on the columns then the mentioned methods will cause the
execution to freeze on Hadoop and that independently of the dataset size
(intriguing issue for small size datasets!).
Any idea what could be causing this type of issue?
To note that if I execute collectAsList on the dataset at the beginning of
the program (before performing the transformations on the columns) then the
method yields results correctly.

Thanks.
Regards


How to allocate vcores to driver (client mode)

2023-03-10 Thread sam smith
Hi,

I am launching through code (client mode) a Spark program to run in Hadoop.
Whenever I check the executors tab of Spark UI I always get 0 as the number
of vcores for the driver. I tried to change that using *spark.driver.cores*,
or also *spark.yarn.am.cores* in the SparkSession configuration but in
vain. I also tried to set those parameters in spark-defaults but, again,
with no success.
To note that in the environment tab, the right config is displayed.

Could this be the reason for a *CollectAsList *to freeze the execution (not
having enough CPU)?


How to share a dataset file across nodes

2023-03-09 Thread sam smith
Hello,

I use Yarn client mode to submit my driver program to Hadoop, the dataset I
load is from the local file system, when i invoke load("file://path") Spark
complains about the csv file being not found, which i totally understand,
since the dataset is not in any of the workers or the applicationMaster but
only where the driver program resides.
I tried to share the file using the configurations:

> *spark.yarn.dist.files* OR *spark.files *

but both ain't working.
My question is how to share the csv dataset across the nodes at the
specified path?

Thanks.


Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread sam smith
@Enrico Minack  I used arrays_zip to merge values
into one row, and then used toJSON() to export the data.
@Bjørn explode_outer didn't yield the expected results.

Thanks anyway.

Le jeu. 16 févr. 2023 à 09:06, Enrico Minack  a
écrit :

> You have to take each row and zip the lists, each element of the result
> becomes one new row.
>
> So turn write a method that turns
>   Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))
> into
>   List(List("A","C","E"), List("B","D","null"), List("null","null","null"))
> and use flatmap with that method.
>
> In Scala, this would read:
>
> df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1),
> row.getSeq[String](2)).zipped.toIterable }.show()
>
> Enrico
>
>
> Am 14.02.23 um 22:54 schrieb sam smith:
>
> Hello guys,
>
> I have the following dataframe:
>
> *col1*
>
> *col2*
>
> *col3*
>
> ["A","B","null"]
>
> ["C","D","null"]
>
> ["E","null","null"]
>
>
> I want to explode it to the following dataframe:
>
> *col1*
>
> *col2*
>
> *col3*
>
> "A"
>
> "C"
>
> "E"
>
> "B"
>
> "D"
>
> "null"
>
> "null"
>
> "null"
>
> "null"
>
> How to do that (preferably in Java) using the explode() method ? knowing
> that something like the following won't yield correct output:
>
> for (String colName: dataset.columns())
> dataset=dataset.withColumn(colName,explode(dataset.col(colName)));
>
>
>
>


How to explode array columns of a dataframe having the same length

2023-02-14 Thread sam smith
Hello guys,

I have the following dataframe:

*col1*

*col2*

*col3*

["A","B","null"]

["C","D","null"]

["E","null","null"]
I want to explode it to the following dataframe:

*col1*

*col2*

*col3*

"A"

"C"

"E"

"B"

"D"

"null"

"null"

"null"

"null"

How to do that (preferably in Java) using the explode() method ? knowing
that something like the following won't yield correct output:

for (String colName: dataset.columns())
dataset=dataset.withColumn(colName,explode(dataset.col(colName)));


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-13 Thread sam smith
Alright, this is the working Java version of it:

List listCols = new ArrayList();
> Arrays.asList(dataset.columns()).forEach(column -> {
> listCols.add(org.apache.spark.sql.functions.collect_set(column)); });
> Column[] arrCols = listCols.toArray(new Column[listCols.size()]);
> dataset = dataset.select(arrCols);


But then, I tried to explode the set of values into rows, through the
explode() but the column values repeat to fill the size of the largest
column.

How to set the repeated values to null instead? (thus keeping only one
exploded set of column values in each column).

Thanks.

Le dim. 12 févr. 2023 à 22:43, Enrico Minack  a
écrit :

> @Sean: This aggregate function does work without an explicit groupBy():
>
> ./spark-3.3.1-bin-hadoop2/bin/spark-shell
> Spark context Web UI available at http://*:4040
> Spark context available as 'sc' (master = local[*], app id =
> local-1676237726079).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.3.1
>   /_/
>
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
> scala> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4,
> 10, "one")).toDF("a", "b", "c")
> scala> df.select(df.columns.map(column =>
> collect_set(col(column)).as(column)): _*).show()
> +++------+
>
> |   a|   b| c|
> +++--+
> |[1, 2, 3, 4]|[20, 10]|[one, two]|
> +++--+
>
> @Sam: I haven't tested the Java code, sorry. I presume you can work it out
> from the working Scala code.
>
> Enrico
>
>
> Am 12.02.23 um 21:32 schrieb Sean Owen:
>
> It doesn't work because it's an aggregate function. You have to groupBy()
> (group by nothing) to make that work, but, you can't assign that as a
> column. Folks those approaches don't make sense semantically in SQL or
> Spark or anything.
> They just mean use threads to collect() distinct values for each col in
> parallel using threads in your program. You don't have to but you could.
> What else are we looking for here, the answer has been given a number of
> times I think.
>
>
> On Sun, Feb 12, 2023 at 2:28 PM sam smith 
> wrote:
>
>> OK, what do you mean by " do your outer for loop in parallel "?
>> btw this didn't work:
>> for (String columnName : df.columns()) {
>> df= df.withColumn(columnName,
>> collect_set(col(columnName)).as(columnName));
>> }
>>
>>
>> Le dim. 12 févr. 2023 à 20:36, Enrico Minack  a
>> écrit :
>>
>>> That is unfortunate, but 3.4.0 is around the corner, really!
>>>
>>> Well, then based on your code, I'd suggest two improvements:
>>> - cache your dataframe after reading, this way, you don't read the
>>> entire file for each column
>>> - do your outer for loop in parallel, then you have N parallel Spark
>>> jobs (only helps if your Spark cluster is not fully occupied by a single
>>> column)
>>>
>>> Your withColumn-approach does not work because withColumn expects a
>>> column as the second argument, but df.select(columnName).distinct() is a
>>> DataFrame and .col is a column in *that* DataFrame, it is not a column
>>> of the dataframe that you call withColumn on.
>>>
>>> It should read:
>>>
>>> Scala:
>>> df.select(df.columns.map(column => collect_set(col(column)).as(column)):
>>> _*).show()
>>>
>>> Java:
>>> for (String columnName : df.columns()) {
>>> df= df.withColumn(columnName,
>>> collect_set(col(columnName)).as(columnName));
>>> }
>>>
>>> Then you have a single DataFrame that computes all columns in a single
>>> Spark job.
>>>
>>> But this reads all distinct values into a single partition, which has
>>> the same downside as collect, so this is as bad as using collect.
>>>
>>> Cheers,
>>> Enrico
>>>
>>>
>>> Am 12.02.23 um 18:05 schrieb sam smith:
>>>
>>> @Enrico Minack  Thanks for "unpivot" but I am
>>> using version 3.3.0 (you are taking it way too far as usual :) )
>>> @Sean Owen  Pls then show me how it can be improved
>>> by code.
>>>
>>> Also, why such an approach (using withColumn() ) doesn't work:
>>>
>>> for (String columnNa

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread sam smith
OK, what do you mean by " do your outer for loop in parallel "?
btw this didn't work:
for (String columnName : df.columns()) {
df= df.withColumn(columnName,
collect_set(col(columnName)).as(columnName));
}


Le dim. 12 févr. 2023 à 20:36, Enrico Minack  a
écrit :

> That is unfortunate, but 3.4.0 is around the corner, really!
>
> Well, then based on your code, I'd suggest two improvements:
> - cache your dataframe after reading, this way, you don't read the entire
> file for each column
> - do your outer for loop in parallel, then you have N parallel Spark jobs
> (only helps if your Spark cluster is not fully occupied by a single column)
>
> Your withColumn-approach does not work because withColumn expects a column
> as the second argument, but df.select(columnName).distinct() is a DataFrame
> and .col is a column in *that* DataFrame, it is not a column of the
> dataframe that you call withColumn on.
>
> It should read:
>
> Scala:
> df.select(df.columns.map(column => collect_set(col(column)).as(column)):
> _*).show()
>
> Java:
> for (String columnName : df.columns()) {
> df= df.withColumn(columnName,
> collect_set(col(columnName)).as(columnName));
> }
>
> Then you have a single DataFrame that computes all columns in a single
> Spark job.
>
> But this reads all distinct values into a single partition, which has the
> same downside as collect, so this is as bad as using collect.
>
> Cheers,
> Enrico
>
>
> Am 12.02.23 um 18:05 schrieb sam smith:
>
> @Enrico Minack  Thanks for "unpivot" but I am using
> version 3.3.0 (you are taking it way too far as usual :) )
> @Sean Owen  Pls then show me how it can be improved by
> code.
>
> Also, why such an approach (using withColumn() ) doesn't work:
>
> for (String columnName : df.columns()) {
> df= df.withColumn(columnName,
> df.select(columnName).distinct().col(columnName));
> }
>
> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
> écrit :
>
>> You could do the entire thing in DataFrame world and write the result to
>> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>>
>> Note this is Scala but should be straightforward to translate into Java:
>>
>> import org.apache.spark.sql.functions.collect_set
>>
>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
>> 123)).toDF("a", "b", "c")
>>
>> df.unpivot(Array.empty, "column", "value")
>>   .groupBy("column")
>>   .agg(collect_set("value").as("distinct_values"))
>>
>> The unpivot operation turns
>> +---+---+---+
>> |  a|  b|  c|
>> +---+---+---+
>> |  1| 10|123|
>> |  2| 20|124|
>> |  3| 20|123|
>> |  4| 10|123|
>> +---+---+---+
>>
>> into
>>
>> +--+-+
>> |column|value|
>> +--+-+
>> | a|1|
>> | b|   10|
>> | c|  123|
>> | a|2|
>> | b|   20|
>> | c|  124|
>> | a|3|
>> | b|   20|
>> | c|  123|
>> | a|4|
>> | b|   10|
>> | c|  123|
>> +--+-+
>>
>> The groupBy("column").agg(collect_set("value").as("distinct_values"))
>> collects distinct values per column:
>> +--+---+
>>
>> |column|distinct_values|
>> +--+---+
>> | c| [123, 124]|
>> | b|   [20, 10]|
>> | a|   [1, 2, 3, 4]|
>> +--+---+
>>
>> Note that unpivot only works if all columns have a "common" type. Then
>> all columns are cast to that common type. If you have incompatible types
>> like Integer and String, you would have to cast them all to String first:
>>
>> import org.apache.spark.sql.types.StringType
>>
>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>>
>> If you want to preserve the type of the values and have multiple value
>> types, you cannot put everything into a DataFrame with one
>> distinct_values column. You could still have multiple DataFrames, one
>> per data type, and write those, or collect the DataFrame's values into Maps:
>>
>> import scala.collection.immutable
>>
>> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.sql.functions.collect_set
>>
>> // if all you columns have the same type
>> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
>> immutable.Seq[Any]] = {
>>   df.unpivot(Array.empty, "column", "value")
>> .groupBy

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread sam smith
@Sean Correct. But I was hoping to improve my solution even more.

Le dim. 12 févr. 2023 à 18:03, Sean Owen  a écrit :

> That's the answer, except, you can never select a result set into a column
> right? you just collect() each of those results. Or, what do you want? I'm
> not clear.
>
> On Sun, Feb 12, 2023 at 10:59 AM sam smith 
> wrote:
>
>> @Enrico Minack  Thanks for "unpivot" but I am
>> using version 3.3.0 (you are taking it way too far as usual :) )
>> @Sean Owen  Pls then show me how it can be improved by
>> code.
>>
>> Also, why such an approach (using withColumn() ) doesn't work:
>>
>> for (String columnName : df.columns()) {
>> df= df.withColumn(columnName,
>> df.select(columnName).distinct().col(columnName));
>> }
>>
>> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
>> écrit :
>>
>>> You could do the entire thing in DataFrame world and write the result to
>>> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>>>
>>> Note this is Scala but should be straightforward to translate into Java:
>>>
>>> import org.apache.spark.sql.functions.collect_set
>>>
>>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
>>> 123)).toDF("a", "b", "c")
>>>
>>> df.unpivot(Array.empty, "column", "value")
>>>   .groupBy("column")
>>>   .agg(collect_set("value").as("distinct_values"))
>>>
>>> The unpivot operation turns
>>> +---+---+---+
>>> |  a|  b|  c|
>>> +---+---+---+
>>> |  1| 10|123|
>>> |  2| 20|124|
>>> |  3| 20|123|
>>> |  4| 10|123|
>>> +---+---+---+
>>>
>>> into
>>>
>>> +--+-+
>>> |column|value|
>>> +--+-+
>>> | a|1|
>>> | b|   10|
>>> | c|  123|
>>> | a|2|
>>> | b|   20|
>>> | c|  124|
>>> | a|3|
>>> | b|   20|
>>> | c|  123|
>>> | a|4|
>>> | b|   10|
>>> | c|  123|
>>> +--+-+
>>>
>>> The groupBy("column").agg(collect_set("value").as("distinct_values"))
>>> collects distinct values per column:
>>> +--+---+
>>>
>>> |column|distinct_values|
>>> +--+---+
>>> | c| [123, 124]|
>>> | b|   [20, 10]|
>>> | a|   [1, 2, 3, 4]|
>>> +--+---+
>>>
>>> Note that unpivot only works if all columns have a "common" type. Then
>>> all columns are cast to that common type. If you have incompatible types
>>> like Integer and String, you would have to cast them all to String first:
>>>
>>> import org.apache.spark.sql.types.StringType
>>>
>>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>>>
>>> If you want to preserve the type of the values and have multiple value
>>> types, you cannot put everything into a DataFrame with one
>>> distinct_values column. You could still have multiple DataFrames, one
>>> per data type, and write those, or collect the DataFrame's values into Maps:
>>>
>>> import scala.collection.immutable
>>>
>>> import org.apache.spark.sql.DataFrame
>>> import org.apache.spark.sql.functions.collect_set
>>>
>>> // if all you columns have the same type
>>> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
>>> immutable.Seq[Any]] = {
>>>   df.unpivot(Array.empty, "column", "value")
>>> .groupBy("column")
>>> .agg(collect_set("value").as("distinct_values"))
>>> .collect()
>>> .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
>>> .toMap
>>> }
>>>
>>>
>>> // if your columns have different types
>>> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
>>> immutable.Seq[Any]] = {
>>>   df.schema.fields
>>> .groupBy(_.dataType)
>>> .mapValues(_.map(_.name))
>>> .par
>>> .map { case (dataType, columns) => df.select(columns.map(col): _*) }
>>> .map(distinctValuesPerColumnOneType)
>>> .flatten
>>> .toList
>>> .toMap
>>> }
>>>
>>> val df = Seq(

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread sam smith
@Enrico Minack  Thanks for "unpivot" but I am using
version 3.3.0 (you are taking it way too far as usual :) )
@Sean Owen  Pls then show me how it can be improved by
code.

Also, why such an approach (using withColumn() ) doesn't work:

for (String columnName : df.columns()) {
df= df.withColumn(columnName,
df.select(columnName).distinct().col(columnName));
}

Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
écrit :

> You could do the entire thing in DataFrame world and write the result to
> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>
> Note this is Scala but should be straightforward to translate into Java:
>
> import org.apache.spark.sql.functions.collect_set
>
> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
> 123)).toDF("a", "b", "c")
>
> df.unpivot(Array.empty, "column", "value")
>   .groupBy("column")
>   .agg(collect_set("value").as("distinct_values"))
>
> The unpivot operation turns
> +---+---+---+
> |  a|  b|  c|
> +---+---+---+
> |  1| 10|123|
> |  2| 20|124|
> |  3| 20|123|
> |  4| 10|123|
> +---+---+---+
>
> into
>
> +--+-+
> |column|value|
> +--+-+
> | a|1|
> | b|   10|
> | c|  123|
> | a|2|
> | b|   20|
> | c|  124|
> | a|3|
> | b|   20|
> | c|  123|
> | a|4|
> | b|   10|
> | c|  123|
> +--+-+
>
> The groupBy("column").agg(collect_set("value").as("distinct_values"))
> collects distinct values per column:
> +--+---+
>
> |column|distinct_values|
> +--+---+
> | c| [123, 124]|
> | b|   [20, 10]|
> | a|   [1, 2, 3, 4]|
> +--+---+
>
> Note that unpivot only works if all columns have a "common" type. Then all
> columns are cast to that common type. If you have incompatible types like
> Integer and String, you would have to cast them all to String first:
>
> import org.apache.spark.sql.types.StringType
>
> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>
> If you want to preserve the type of the values and have multiple value
> types, you cannot put everything into a DataFrame with one distinct_values
> column. You could still have multiple DataFrames, one per data type, and
> write those, or collect the DataFrame's values into Maps:
>
> import scala.collection.immutable
>
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.functions.collect_set
>
> // if all you columns have the same type
> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
> immutable.Seq[Any]] = {
>   df.unpivot(Array.empty, "column", "value")
> .groupBy("column")
> .agg(collect_set("value").as("distinct_values"))
> .collect()
> .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
> .toMap
> }
>
>
> // if your columns have different types
> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
> immutable.Seq[Any]] = {
>   df.schema.fields
> .groupBy(_.dataType)
> .mapValues(_.map(_.name))
> .par
> .map { case (dataType, columns) => df.select(columns.map(col): _*) }
> .map(distinctValuesPerColumnOneType)
> .flatten
> .toList
> .toMap
> }
>
> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10,
> "one")).toDF("a", "b", "c")
> distinctValuesPerColumn(df)
>
> The result is: (list values are of original type)
> Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))
>
> Hope this helps,
> Enrico
>
>
> Am 10.02.23 um 22:56 schrieb sam smith:
>
> Hi Apotolos,
> Can you suggest a better approach while keeping values within a dataframe?
>
> Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <
> papad...@csd.auth.gr> a écrit :
>
>> Dear Sam,
>>
>> you are assuming that the data fits in the memory of your local machine.
>> You are using as a basis a dataframe, which potentially can be very large,
>> and then you are storing the data in local lists. Keep in mind that that
>> the number of distinct elements in a column may be very large (depending on
>> the app). I suggest to work on a solution that assumes that the number of
>> distinct values is also large. Thus, you should keep your data in
>> dataframes or RDDs, and store them as csv files, parquet, etc.
>>
>> a.p.
>>
>>
>> 

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread sam smith
I am not sure i understand well " Just need to do the cols one at a time".
Plus I think Apostolos is right, this needs a dataframe approach not a list
approach.

Le ven. 10 févr. 2023 à 22:47, Sean Owen  a écrit :

> For each column, select only that call and get distinct values. Similar to
> what you do here. Just need to do the cols one at a time. Your current code
> doesn't do what you want.
>
> On Fri, Feb 10, 2023, 3:46 PM sam smith 
> wrote:
>
>> Hi Sean,
>>
>> "You need to select the distinct values of each col one at a time", how ?
>>
>> Le ven. 10 févr. 2023 à 22:40, Sean Owen  a écrit :
>>
>>> That gives you all distinct tuples of those col values. You need to
>>> select the distinct values of each col one at a time. Sure just collect()
>>> the result as you do here.
>>>
>>> On Fri, Feb 10, 2023, 3:34 PM sam smith 
>>> wrote:
>>>
>>>> I want to get the distinct values of each column in a List (is it good
>>>> practice to use List here?), that contains as first element the column
>>>> name, and the other element its distinct values so that for a dataset we
>>>> get a list of lists, i do it this way (in my opinion no so fast):
>>>>
>>>> List> finalList = new ArrayList>();
>>>> Dataset df = spark.read().format("csv").option("header", 
>>>> "true").load("/pathToCSV");
>>>> String[] columnNames = df.columns();
>>>>  for (int i=0;i>>> List columnList = new ArrayList();
>>>>
>>>> columnList.add(columnNames[i]);
>>>>
>>>>
>>>> List columnValues = 
>>>> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
>>>> for (int j=0;j>>> columnList.add(columnValues.get(j).apply(0).toString());
>>>>
>>>> finalList.add(columnList);
>>>>
>>>>
>>>> How to improve this?
>>>>
>>>> Also, can I get the results in JSON format?
>>>>
>>>


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread sam smith
Hi Apotolos,
Can you suggest a better approach while keeping values within a dataframe?

Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <
papad...@csd.auth.gr> a écrit :

> Dear Sam,
>
> you are assuming that the data fits in the memory of your local machine.
> You are using as a basis a dataframe, which potentially can be very large,
> and then you are storing the data in local lists. Keep in mind that that
> the number of distinct elements in a column may be very large (depending on
> the app). I suggest to work on a solution that assumes that the number of
> distinct values is also large. Thus, you should keep your data in
> dataframes or RDDs, and store them as csv files, parquet, etc.
>
> a.p.
>
>
> On 10/2/23 23:40, sam smith wrote:
>
> I want to get the distinct values of each column in a List (is it good
> practice to use List here?), that contains as first element the column
> name, and the other element its distinct values so that for a dataset we
> get a list of lists, i do it this way (in my opinion no so fast):
>
> List> finalList = new ArrayList>();
> Dataset df = spark.read().format("csv").option("header", 
> "true").load("/pathToCSV");
> String[] columnNames = df.columns();
>  for (int i=0;i List columnList = new ArrayList();
>
> columnList.add(columnNames[i]);
>
>
> List columnValues = 
> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
> for (int j=0;j columnList.add(columnValues.get(j).apply(0).toString());
>
> finalList.add(columnList);
>
>
> How to improve this?
>
> Also, can I get the results in JSON format?
>
> --
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol
>
>


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread sam smith
Hi Sean,

"You need to select the distinct values of each col one at a time", how ?

Le ven. 10 févr. 2023 à 22:40, Sean Owen  a écrit :

> That gives you all distinct tuples of those col values. You need to select
> the distinct values of each col one at a time. Sure just collect() the
> result as you do here.
>
> On Fri, Feb 10, 2023, 3:34 PM sam smith 
> wrote:
>
>> I want to get the distinct values of each column in a List (is it good
>> practice to use List here?), that contains as first element the column
>> name, and the other element its distinct values so that for a dataset we
>> get a list of lists, i do it this way (in my opinion no so fast):
>>
>> List> finalList = new ArrayList>();
>> Dataset df = spark.read().format("csv").option("header", 
>> "true").load("/pathToCSV");
>> String[] columnNames = df.columns();
>>  for (int i=0;i> List columnList = new ArrayList();
>>
>> columnList.add(columnNames[i]);
>>
>>
>> List columnValues = 
>> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
>> for (int j=0;j> columnList.add(columnValues.get(j).apply(0).toString());
>>
>> finalList.add(columnList);
>>
>>
>> How to improve this?
>>
>> Also, can I get the results in JSON format?
>>
>


How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread sam smith
I want to get the distinct values of each column in a List (is it good
practice to use List here?), that contains as first element the column
name, and the other element its distinct values so that for a dataset we
get a list of lists, i do it this way (in my opinion no so fast):

List> finalList = new ArrayList>();
Dataset df = spark.read().format("csv").option("header",
"true").load("/pathToCSV");
String[] columnNames = df.columns();
 for (int i=0;i columnList = new ArrayList();

columnList.add(columnNames[i]);


List columnValues =
df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
for (int j=0;j

Can we upload a csv dataset into Hive using SparkSQL?

2022-12-10 Thread sam smith
Hello,

I want to create a table in Hive and then load a CSV file content into it
all by means of Spark SQL.
I saw in the docs the example with the .txt file BUT can we do instead
something like the following to accomplish what i want? :

String warehouseLocation = new
File("spark-warehouse").getAbsolutePath();SparkSession spark =
SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS csvFile USING
hive");spark.sql("LOAD DATA LOCAL INPATH
'C:/Users/Me/Documents/examples/src/main/resources/data.csv' INTO
TABLE csvFile");


Re: Aggregate over a column: the proper way to do

2022-04-10 Thread sam smith
Exact, one row, and two columns

Le sam. 9 avr. 2022 à 17:44, Sean Owen  a écrit :

> But it only has one row, right?
>
> On Sat, Apr 9, 2022, 10:06 AM sam smith 
> wrote:
>
>> Yes. Returns the number of rows in the Dataset as *long*. but in my case
>> the aggregation returns a table of two columns.
>>
>> Le ven. 8 avr. 2022 à 14:12, Sean Owen  a écrit :
>>
>>> Dataset.count() returns one value directly?
>>>
>>> On Thu, Apr 7, 2022 at 11:25 PM sam smith 
>>> wrote:
>>>
>>>> My bad, yes of course that! still i don't like the ..
>>>> select("count(myCol)") .. part in my line is there any replacement to that 
>>>> ?
>>>>
>>>> Le ven. 8 avr. 2022 à 06:13, Sean Owen  a écrit :
>>>>
>>>>> Just do an average then? Most of my point is that filtering to one
>>>>> group and then grouping is pointless.
>>>>>
>>>>> On Thu, Apr 7, 2022, 11:10 PM sam smith 
>>>>> wrote:
>>>>>
>>>>>> What if i do avg instead of count?
>>>>>>
>>>>>> Le ven. 8 avr. 2022 à 05:32, Sean Owen  a écrit :
>>>>>>
>>>>>>> Wait, why groupBy at all? After the filter only rows with myCol
>>>>>>> equal to your target are left. There is only one group. Don't group just
>>>>>>> count after the filter?
>>>>>>>
>>>>>>> On Thu, Apr 7, 2022, 10:27 PM sam smith 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I want to aggregate a column by counting the number of rows having
>>>>>>>> the value "myTargetValue" and return the result
>>>>>>>> I am doing it like the following:in JAVA
>>>>>>>>
>>>>>>>>> long result =
>>>>>>>>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0);
>>>>>>>>
>>>>>>>>
>>>>>>>> Is that the right way? if no, what if a more optimized way to do
>>>>>>>> that (always in JAVA)?
>>>>>>>> Thanks for the help.
>>>>>>>>
>>>>>>>


Re: Aggregate over a column: the proper way to do

2022-04-09 Thread sam smith
Yes. Returns the number of rows in the Dataset as *long*. but in my case
the aggregation returns a table of two columns.

Le ven. 8 avr. 2022 à 14:12, Sean Owen  a écrit :

> Dataset.count() returns one value directly?
>
> On Thu, Apr 7, 2022 at 11:25 PM sam smith 
> wrote:
>
>> My bad, yes of course that! still i don't like the ..
>> select("count(myCol)") .. part in my line is there any replacement to that ?
>>
>> Le ven. 8 avr. 2022 à 06:13, Sean Owen  a écrit :
>>
>>> Just do an average then? Most of my point is that filtering to one group
>>> and then grouping is pointless.
>>>
>>> On Thu, Apr 7, 2022, 11:10 PM sam smith 
>>> wrote:
>>>
>>>> What if i do avg instead of count?
>>>>
>>>> Le ven. 8 avr. 2022 à 05:32, Sean Owen  a écrit :
>>>>
>>>>> Wait, why groupBy at all? After the filter only rows with myCol equal
>>>>> to your target are left. There is only one group. Don't group just count
>>>>> after the filter?
>>>>>
>>>>> On Thu, Apr 7, 2022, 10:27 PM sam smith 
>>>>> wrote:
>>>>>
>>>>>> I want to aggregate a column by counting the number of rows having
>>>>>> the value "myTargetValue" and return the result
>>>>>> I am doing it like the following:in JAVA
>>>>>>
>>>>>>> long result =
>>>>>>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0);
>>>>>>
>>>>>>
>>>>>> Is that the right way? if no, what if a more optimized way to do that
>>>>>> (always in JAVA)?
>>>>>> Thanks for the help.
>>>>>>
>>>>>


Re: Aggregate over a column: the proper way to do

2022-04-07 Thread sam smith
My bad, yes of course that! still i don't like the ..
select("count(myCol)") .. part in my line is there any replacement to that ?

Le ven. 8 avr. 2022 à 06:13, Sean Owen  a écrit :

> Just do an average then? Most of my point is that filtering to one group
> and then grouping is pointless.
>
> On Thu, Apr 7, 2022, 11:10 PM sam smith 
> wrote:
>
>> What if i do avg instead of count?
>>
>> Le ven. 8 avr. 2022 à 05:32, Sean Owen  a écrit :
>>
>>> Wait, why groupBy at all? After the filter only rows with myCol equal to
>>> your target are left. There is only one group. Don't group just count after
>>> the filter?
>>>
>>> On Thu, Apr 7, 2022, 10:27 PM sam smith 
>>> wrote:
>>>
>>>> I want to aggregate a column by counting the number of rows having the
>>>> value "myTargetValue" and return the result
>>>> I am doing it like the following:in JAVA
>>>>
>>>>> long result =
>>>>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0);
>>>>
>>>>
>>>> Is that the right way? if no, what if a more optimized way to do that
>>>> (always in JAVA)?
>>>> Thanks for the help.
>>>>
>>>


Re: Aggregate over a column: the proper way to do

2022-04-07 Thread sam smith
What if i do avg instead of count?

Le ven. 8 avr. 2022 à 05:32, Sean Owen  a écrit :

> Wait, why groupBy at all? After the filter only rows with myCol equal to
> your target are left. There is only one group. Don't group just count after
> the filter?
>
> On Thu, Apr 7, 2022, 10:27 PM sam smith 
> wrote:
>
>> I want to aggregate a column by counting the number of rows having the
>> value "myTargetValue" and return the result
>> I am doing it like the following:in JAVA
>>
>>> long result =
>>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0);
>>
>>
>> Is that the right way? if no, what if a more optimized way to do that
>> (always in JAVA)?
>> Thanks for the help.
>>
>


Aggregate over a column: the proper way to do

2022-04-07 Thread sam smith
I want to aggregate a column by counting the number of rows having the
value "myTargetValue" and return the result
I am doing it like the following:in JAVA

> long result =
> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0);


Is that the right way? if no, what if a more optimized way to do that
(always in JAVA)?
Thanks for the help.


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
spark-submit a spark application on Hadoop (cluster mode) that's what i
mean by  executing on Hadoop

Le lun. 24 janv. 2022 à 18:00, Sean Owen  a écrit :

> I am still not understanding what you mean by "executing on Hadoop". Spark
> does not use Hadoop for execution. Probably can't answer until this is
> cleared up.
>
> On Mon, Jan 24, 2022 at 10:57 AM sam smith 
> wrote:
>
>> I mean the DAG order is somehow altered when executing on Hadoop
>>
>> Le lun. 24 janv. 2022 à 17:17, Sean Owen  a écrit :
>>
>>> Code is not executed by Hadoop, nor passed through Hadoop somehow. Do
>>> you mean data? data is read as-is. There is typically no guarantee about
>>> ordering of data in files but you can order data. Still not sure what
>>> specifically you are worried about here, but I don't think the kind of
>>> thing you're contemplating can happen, no
>>>
>>> On Mon, Jan 24, 2022 at 9:28 AM sam smith 
>>> wrote:
>>>
>>>> I am aware of that, but whenever the chunks of code are returned to
>>>> Spark from Hadoop (after processing) could they be done not in the ordered
>>>> way ? could this ever happen ?
>>>>
>>>> Le lun. 24 janv. 2022 à 16:14, Sean Owen  a écrit :
>>>>
>>>>> Hadoop does not run Spark programs, Spark does. How or why would
>>>>> something, what, modify the byte code? No
>>>>>
>>>>> On Mon, Jan 24, 2022, 9:07 AM sam smith 
>>>>> wrote:
>>>>>
>>>>>> My point is could Hadoop go wrong about one Spark execution ? meaning
>>>>>> that it gets confused (given the concurrent distributed tasks) and then
>>>>>> adds wrong instruction to the program, or maybe does execute an 
>>>>>> instruction
>>>>>> not at its right order (shuffling the order of execution by executing
>>>>>> previous ones, while it shouldn't) ? Before finishing and returning the
>>>>>> results from one node it returns the results of the other in a wrong way
>>>>>> for example.
>>>>>>
>>>>>> Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :
>>>>>>
>>>>>>> Not clear what you mean here. A Spark program is a program, so what
>>>>>>> are the alternatives here? program execution order is still program
>>>>>>> execution order. You are not guaranteed anything about order of 
>>>>>>> concurrent
>>>>>>> tasks. Failed tasks can be reexecuted so should be idempotent. I think 
>>>>>>> the
>>>>>>> answer is 'no' but not sure what you are thinking of here.
>>>>>>>
>>>>>>> On Mon, Jan 24, 2022 at 7:10 AM sam smith <
>>>>>>> qustacksm2123...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello guys,
>>>>>>>>
>>>>>>>> I hope my question does not sound weird, but could a Spark
>>>>>>>> execution on Hadoop cluster give different output than the program 
>>>>>>>> actually
>>>>>>>> does ? I mean by that, the execution order is messed by hadoop, or an
>>>>>>>> instruction executed twice..; ?
>>>>>>>>
>>>>>>>> Thanks for your enlightenment
>>>>>>>>
>>>>>>>


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
I mean the DAG order is somehow altered when executing on Hadoop

Le lun. 24 janv. 2022 à 17:17, Sean Owen  a écrit :

> Code is not executed by Hadoop, nor passed through Hadoop somehow. Do you
> mean data? data is read as-is. There is typically no guarantee about
> ordering of data in files but you can order data. Still not sure what
> specifically you are worried about here, but I don't think the kind of
> thing you're contemplating can happen, no
>
> On Mon, Jan 24, 2022 at 9:28 AM sam smith 
> wrote:
>
>> I am aware of that, but whenever the chunks of code are returned to Spark
>> from Hadoop (after processing) could they be done not in the ordered way ?
>> could this ever happen ?
>>
>> Le lun. 24 janv. 2022 à 16:14, Sean Owen  a écrit :
>>
>>> Hadoop does not run Spark programs, Spark does. How or why would
>>> something, what, modify the byte code? No
>>>
>>> On Mon, Jan 24, 2022, 9:07 AM sam smith 
>>> wrote:
>>>
>>>> My point is could Hadoop go wrong about one Spark execution ? meaning
>>>> that it gets confused (given the concurrent distributed tasks) and then
>>>> adds wrong instruction to the program, or maybe does execute an instruction
>>>> not at its right order (shuffling the order of execution by executing
>>>> previous ones, while it shouldn't) ? Before finishing and returning the
>>>> results from one node it returns the results of the other in a wrong way
>>>> for example.
>>>>
>>>> Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :
>>>>
>>>>> Not clear what you mean here. A Spark program is a program, so what
>>>>> are the alternatives here? program execution order is still program
>>>>> execution order. You are not guaranteed anything about order of concurrent
>>>>> tasks. Failed tasks can be reexecuted so should be idempotent. I think the
>>>>> answer is 'no' but not sure what you are thinking of here.
>>>>>
>>>>> On Mon, Jan 24, 2022 at 7:10 AM sam smith 
>>>>> wrote:
>>>>>
>>>>>> Hello guys,
>>>>>>
>>>>>> I hope my question does not sound weird, but could a Spark execution
>>>>>> on Hadoop cluster give different output than the program actually does ? 
>>>>>> I
>>>>>> mean by that, the execution order is messed by hadoop, or an instruction
>>>>>> executed twice..; ?
>>>>>>
>>>>>> Thanks for your enlightenment
>>>>>>
>>>>>


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
I am aware of that, but whenever the chunks of code are returned to Spark
from Hadoop (after processing) could they be done not in the ordered way ?
could this ever happen ?

Le lun. 24 janv. 2022 à 16:14, Sean Owen  a écrit :

> Hadoop does not run Spark programs, Spark does. How or why would
> something, what, modify the byte code? No
>
> On Mon, Jan 24, 2022, 9:07 AM sam smith 
> wrote:
>
>> My point is could Hadoop go wrong about one Spark execution ? meaning
>> that it gets confused (given the concurrent distributed tasks) and then
>> adds wrong instruction to the program, or maybe does execute an instruction
>> not at its right order (shuffling the order of execution by executing
>> previous ones, while it shouldn't) ? Before finishing and returning the
>> results from one node it returns the results of the other in a wrong way
>> for example.
>>
>> Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :
>>
>>> Not clear what you mean here. A Spark program is a program, so what are
>>> the alternatives here? program execution order is still program execution
>>> order. You are not guaranteed anything about order of concurrent tasks.
>>> Failed tasks can be reexecuted so should be idempotent. I think the answer
>>> is 'no' but not sure what you are thinking of here.
>>>
>>> On Mon, Jan 24, 2022 at 7:10 AM sam smith 
>>> wrote:
>>>
>>>> Hello guys,
>>>>
>>>> I hope my question does not sound weird, but could a Spark execution on
>>>> Hadoop cluster give different output than the program actually does ? I
>>>> mean by that, the execution order is messed by hadoop, or an instruction
>>>> executed twice..; ?
>>>>
>>>> Thanks for your enlightenment
>>>>
>>>


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
My point is could Hadoop go wrong about one Spark execution ? meaning that
it gets confused (given the concurrent distributed tasks) and then adds
wrong instruction to the program, or maybe does execute an instruction not
at its right order (shuffling the order of execution by executing previous
ones, while it shouldn't) ? Before finishing and returning the results from
one node it returns the results of the other in a wrong way for example.

Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :

> Not clear what you mean here. A Spark program is a program, so what are
> the alternatives here? program execution order is still program execution
> order. You are not guaranteed anything about order of concurrent tasks.
> Failed tasks can be reexecuted so should be idempotent. I think the answer
> is 'no' but not sure what you are thinking of here.
>
> On Mon, Jan 24, 2022 at 7:10 AM sam smith 
> wrote:
>
>> Hello guys,
>>
>> I hope my question does not sound weird, but could a Spark execution on
>> Hadoop cluster give different output than the program actually does ? I
>> mean by that, the execution order is messed by hadoop, or an instruction
>> executed twice..; ?
>>
>> Thanks for your enlightenment
>>
>


Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
Hello guys,

I hope my question does not sound weird, but could a Spark execution on
Hadoop cluster give different output than the program actually does ? I
mean by that, the execution order is messed by hadoop, or an instruction
executed twice..; ?

Thanks for your enlightenment


Re: About some Spark technical help

2021-12-24 Thread sam smith
Thanks for the feedback Andrew.

Le sam. 25 déc. 2021 à 03:17, Andrew Davidson  a écrit :

> Hi Sam
>
> It is kind of hard to review straight code. Adding some some sample data,
> a unit test and expected results. Would be a good place to start. Ie.
> Determine the fidelity of your implementation compared to the original.
>
> Also a verbal description of the algo would be helpful
>
> Happy Holidays
>
> Andy
>
> On Fri, Dec 24, 2021 at 3:17 AM sam smith 
> wrote:
>
>> Hi Gourav,
>>
>> Good question! that's the programming language i am most proficient at.
>> You are always welcome to suggest corrective remarks about my (Spark)
>> code.
>>
>> Kind regards.
>>
>> Le ven. 24 déc. 2021 à 11:58, Gourav Sengupta 
>> a écrit :
>>
>>> Hi,
>>>
>>> out of sheer and utter curiosity, why JAVA?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Thu, Dec 23, 2021 at 5:10 PM sam smith 
>>> wrote:
>>>
>>>> Hi Andrew,
>>>>
>>>> Thanks, here's the Github repo to the code and the publication :
>>>> https://github.com/SamSmithDevs10/paperReplicationForReview
>>>>
>>>> Kind regards
>>>>
>>>> Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson  a
>>>> écrit :
>>>>
>>>>> Hi Sam
>>>>>
>>>>>
>>>>>
>>>>> Can you tell us more? What is the algorithm? Can you send us the URL
>>>>> the publication
>>>>>
>>>>>
>>>>>
>>>>> Kind regards
>>>>>
>>>>>
>>>>>
>>>>> Andy
>>>>>
>>>>>
>>>>>
>>>>> *From: *sam smith 
>>>>> *Date: *Wednesday, December 22, 2021 at 10:59 AM
>>>>> *To: *"user@spark.apache.org" 
>>>>> *Subject: *About some Spark technical help
>>>>>
>>>>>
>>>>>
>>>>> Hello guys,
>>>>>
>>>>>
>>>>>
>>>>> I am replicating a paper's algorithm in Spark / Java, and want to ask
>>>>> you guys for some assistance to validate / review about 150 lines of code.
>>>>> My github repo contains both my java class and the related paper,
>>>>>
>>>>>
>>>>>
>>>>> Any interested reviewer here ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks.
>>>>>
>>>>


Re: About some Spark technical help

2021-12-24 Thread sam smith
Hi Gourav,

Good question! that's the programming language i am most proficient at.
You are always welcome to suggest corrective remarks about my (Spark) code.

Kind regards.

Le ven. 24 déc. 2021 à 11:58, Gourav Sengupta  a
écrit :

> Hi,
>
> out of sheer and utter curiosity, why JAVA?
>
> Regards,
> Gourav Sengupta
>
> On Thu, Dec 23, 2021 at 5:10 PM sam smith 
> wrote:
>
>> Hi Andrew,
>>
>> Thanks, here's the Github repo to the code and the publication :
>> https://github.com/SamSmithDevs10/paperReplicationForReview
>>
>> Kind regards
>>
>> Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson  a
>> écrit :
>>
>>> Hi Sam
>>>
>>>
>>>
>>> Can you tell us more? What is the algorithm? Can you send us the URL the
>>> publication
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> *From: *sam smith 
>>> *Date: *Wednesday, December 22, 2021 at 10:59 AM
>>> *To: *"user@spark.apache.org" 
>>> *Subject: *About some Spark technical help
>>>
>>>
>>>
>>> Hello guys,
>>>
>>>
>>>
>>> I am replicating a paper's algorithm in Spark / Java, and want to ask
>>> you guys for some assistance to validate / review about 150 lines of code.
>>> My github repo contains both my java class and the related paper,
>>>
>>>
>>>
>>> Any interested reviewer here ?
>>>
>>>
>>>
>>>
>>>
>>> Thanks.
>>>
>>


Re: About some Spark technical help

2021-12-23 Thread sam smith
Hi Andrew,

Thanks, here's the Github repo to the code and the publication :
https://github.com/SamSmithDevs10/paperReplicationForReview

Kind regards

Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson  a écrit :

> Hi Sam
>
>
>
> Can you tell us more? What is the algorithm? Can you send us the URL the
> publication
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> *From: *sam smith 
> *Date: *Wednesday, December 22, 2021 at 10:59 AM
> *To: *"user@spark.apache.org" 
> *Subject: *About some Spark technical help
>
>
>
> Hello guys,
>
>
>
> I am replicating a paper's algorithm in Spark / Java, and want to ask you
> guys for some assistance to validate / review about 150 lines of code. My
> github repo contains both my java class and the related paper,
>
>
>
> Any interested reviewer here ?
>
>
>
>
>
> Thanks.
>


dataset partitioning algorithm implementation help

2021-12-23 Thread sam smith
Hello All,

I am replicating a paper's algorithm about a partitioning approach to
anonymize datasets with Spark / Java, and want to ask you for some help to
review my 150 lines of code. My github repo, attached below, contains both
my java class and the related paper:

https://github.com/SamSmithDevs10/paperReplicationForReview

Thanks in advance.

Thanks.


About some Spark technical help

2021-12-22 Thread sam smith
Hello guys,

I am replicating a paper's algorithm in Spark / Java, and want to ask you
guys for some assistance to validate / review about 150 lines of code. My
github repo contains both my java class and the related paper,

Any interested reviewer here ?


Thanks.


About some Spark technical help

2021-12-22 Thread sam smith
Hello guys,

I am replicating a paper's algorithm in Spark / Java, and want to ask you
guys for some assistance to validate / review about 150 lines of code. My
github repo contains both my java class and the related paper,

Any interested reviewer here ?


Thanks.


Re: About some Spark technical assistance

2021-12-13 Thread sam smith
you were added to the repo to contribute, thanks. I included the java class
and the paper i am replicating

Le lun. 13 déc. 2021 à 04:27,  a écrit :

> github url please.
>
> On 2021-12-13 01:06, sam smith wrote:
> > Hello guys,
> >
> > I am replicating a paper's algorithm (graph coloring algorithm) in
> > Spark under Java, and thought about asking you guys for some
> > assistance to validate / review my 600 lines of code. Any volunteers
> > to share the code with ?
> > Thanks
>


About some Spark technical assistance

2021-12-12 Thread sam smith
Hello guys,

I am replicating a paper's algorithm (graph coloring algorithm) in Spark
under Java, and thought about asking you guys for some assistance to
validate / review my 600 lines of code. Any volunteers to share the code
with ?
Thanks


[no subject]

2021-11-18 Thread Sam Elamin
unsubscribe


Re: Parquet Metadata

2021-06-23 Thread Sam
Hi, I only know about comments which you can add to each column where you
can add these key values.

Thanks.

On Wed, Jun 23, 2021 at 11:31 AM Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi folks,
>
>
>
> Maybe not the right audience but maybe you came along such an requirement.
>
> Is it possible to define a parquet schema, that contains technical column
> names and a list of translations for a certain column name into different
> languages?
>
>
>
> I give an example:
>
> Technical: “custnr” would translate to { EN:”Customer Number”,  DE:
> “Kundennummer”}
>
>
>
> We could of course deliver a meta data file containing such language
> mappings, but our questions is whether we could embed that info into the
> parquet meta data?
>
>
>
> Thanks a lot,
>
> Meikel
>
>
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sam
Like I said In my previous email, can you try this and let me know how many
tasks you see?

val repRdd = scoredRdd.repartition(50).cache()
repRdd.take(1)
Then map operation on repRdd here.

I’ve done similar map operations in the past and this works.

Thanks.

On Wed, Jun 9, 2021 at 11:17 AM Tom Barber  wrote:

> Also just to follow up on that slightly, I did also try off the back of
> another comment:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>
>
> Where I repartitioned that scoredRdd map out of interest, it then triggers
> the FairFetcher function there, instead of in the runJob(), but still on a
> single executor 
>
> Tom
>
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>
>>
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>>
>> The fair fetcher is an interface to a pluggable backend that basically
>> takes some of the fields and goes and crawls websites listed in them
>> looking for information. We wrote this code 6 years ago for a DARPA project
>> tracking down criminals on the web. Now I'm reusing it but trying to force
>> it to scale out a bit more.
>>
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>> on one node makes my cluster sad) to each executor and have it run a crawl,
>> then move on and get another one and so on. That way you're not saturating
>> a node trying to look up all of them and you could add more nodes for
>> greater capacity pretty quickly. Once the website has been captured, you
>> can then "score" it for want of a better term to determine its usefulness,
>> which is where the map is being triggered.
>>
>> In answer to your questions Sean, no action seems triggered until you end
>> up in the score block and the sc.runJob() because thats literally the next
>> line of functionality as Kafka isn't enabled.
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer).toSeq })
>>   .persist()
>>
>> if (kafkaEnable) {
>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>> }
>> val scoredRdd = score(fetchedRdd)
>>
>>
>> That if block is disabled so the score function runs. Inside of that:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>> 
>>
>>
>> When its doing stuff in the SparkUI I can see that its waiting on the
>> sc.runJob() line, so thats the execution point.
>>
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>
>>> persist() doesn't even persist by itself - just sets it to be persisted
>>> when it's executed.
>>> key doesn't matter here, nor partitioning, if this code is trying to run
>>> things on the driver inadvertently.
>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>> running some supplied functions very directly and at a low-level with
>>> sc.runJob, which might be part of how this can do something unusual.
>>> How do you trigger any action? what happens after persist()
>>>
>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>>>
 Thanks Mich,

 The key on the first iteration is just a string that says "seed", so it
 is indeed on the first crawl the same across all of the groups. Further
 iterations would be different, but I'm not there yet. I was under the
 impression that a repartition would distribute the tasks. Is that not the
 case?

 Thanks

 Tom

 On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi Tom,
>
> Persist() here simply means persist to memory). That is all. You can
> check UI tab on storage
>
>
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>
> So I gather the code is stuck from your link in the driver. You stated
> that you tried repartition() but it did not do anything,
>
> Further you stated :
>
> " The key is pretty static in these tests, so I have also tried
> forcing the partition count (50 on a 16 core per node cluster) and also
> 

Re: REST Structured Steaming Sink

2020-07-03 Thread Sam Elamin
Hi Folks,

Great discussion! I will take into account rate-limiting and make it
configurable for the http request as well as all

I was wondering if there is anything I might have missed that would make it
technically impossible to do or at least difficult enough to not warrant
the effort

Is there anything I might have overlooked? Also, would this be useful to
people?

My idea is from a business perspective, why are we making them wait till
the next scheduled batch run for data that is already available from an
API. You could run a job every minute/hour but that in itself sounds like a
streaming use-case

Thoughts?

Regards
Sam

On Thu, Jul 2, 2020 at 3:31 AM Burak Yavuz  wrote:

> Well, the difference is, a technical user writes the UDF and a
> non-technical user may use this built-in thing (misconfigure it) and shoot
> themselves in the foot.
>
> On Wed, Jul 1, 2020, 6:40 PM Andrew Melo  wrote:
>
>> On Wed, Jul 1, 2020 at 8:13 PM Burak Yavuz  wrote:
>> >
>> > I'm not sure having a built-in sink that allows you to DDOS servers is
>> the best idea either. foreachWriter is typically used for such use cases,
>> not foreachBatch. It's also pretty hard to guarantee exactly-once, rate
>> limiting, etc.
>>
>> If you control the machines and can run arbitrary code, you can DDOS
>> whatever you want. What's the difference between this proposal and
>> writing a UDF that opens 1,000 connections to a target machine?
>>
>> > Best,
>> > Burak
>> >
>> > On Wed, Jul 1, 2020 at 5:54 PM Holden Karau 
>> wrote:
>> >>
>> >> I think adding something like this (if it doesn't already exist) could
>> help make structured streaming easier to use, foreachBatch is not the best
>> API.
>> >>
>> >> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>> >>>
>> >>> I guess the method, query parameter, header, and the payload would be
>> all different for almost every use case - that makes it hard to generalize
>> and requires implementation to be pretty much complicated to be flexible
>> enough.
>> >>>
>> >>> I'm not aware of any custom sink implementing REST so your best bet
>> would be simply implementing your own with foreachBatch, but so someone
>> might jump in and provide a pointer if there is something in the Spark
>> ecosystem.
>> >>>
>> >>> Thanks,
>> >>> Jungtaek Lim (HeartSaVioR)
>> >>>
>> >>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin 
>> wrote:
>> >>>>
>> >>>> Hi All,
>> >>>>
>> >>>>
>> >>>> We ingest alot of restful APIs into our lake and I'm wondering if it
>> is at all possible to created a rest sink in structured streaming?
>> >>>>
>> >>>> For now I'm only focusing on restful services that have an
>> incremental ID so my sink can just poll for new data then ingest.
>> >>>>
>> >>>> I can't seem to find a connector that does this and my gut instinct
>> tells me it's probably because it isn't possible due to something
>> completely obvious that I am missing
>> >>>>
>> >>>> I know some RESTful API obfuscate the IDs to a hash of strings and
>> that could be a problem but since I'm planning on focusing on just
>> numerical IDs that just get incremented I think I won't be facing that issue
>> >>>>
>> >>>>
>> >>>> Can anyone let me know if this sounds like a daft idea? Will I need
>> something like Kafka or kinesis as a buffer and redundancy or am I
>> overthinking this?
>> >>>>
>> >>>>
>> >>>> I would love to bounce ideas with people who runs structured
>> streaming jobs in production
>> >>>>
>> >>>>
>> >>>> Kind regards
>> >>>> San
>> >>>>
>> >>>>
>> >>
>> >>
>> >> --
>> >> Twitter: https://twitter.com/holdenkarau
>> >> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9
>> >> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>


REST Structured Steaming Sink

2020-07-01 Thread Sam Elamin
Hi All,


We ingest alot of restful APIs into our lake and I'm wondering if it is at
all possible to created a rest sink in structured streaming?

For now I'm only focusing on restful services that have an incremental ID
so my sink can just poll for new data then ingest.

I can't seem to find a connector that does this and my gut instinct tells
me it's probably because it isn't possible due to something completely
obvious that I am missing

I know some RESTful API obfuscate the IDs to a hash of strings and that
could be a problem but since I'm planning on focusing on just numerical IDs
that just get incremented I think I won't be facing that issue


Can anyone let me know if this sounds like a daft idea? Will I need
something like Kafka or kinesis as a buffer and redundancy or am I
overthinking this?


I would love to bounce ideas with people who runs structured streaming jobs
in production


Kind regards
San


Avro file question

2019-11-04 Thread Sam
Hi,

How do we choose between single large avro file (size much larger than HDFS
block size) vs multiple smaller avro files (close to HDFS block size?

Since avro is splittable, is there even a need to split a very large avro
file into smaller files?

I’m assuming that a single large avro file can also be split into multiple
mappers/reducers/executors during processing.

Thanks.


Re: Spark Scala reading from Google Cloud BigQuery table throws error

2018-12-19 Thread Sam Elamin
Hi Mich

I wrote a connector to make it easier to connect Bigquery and Spark

Have a look here https://github.com/samelamin/spark-bigquery/

Your feedback is always welcome

Kind Regards
Sam

On Tue, Dec 18, 2018 at 7:46 PM Mich Talebzadeh 
wrote:

> Thanks Jorn. I will try that. Requires installing sbt etc on ephemeral
> compute server in Google Cloud to built an uber jar file.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 18 Dec 2018 at 11:16, Jörn Franke  wrote:
>
>> Maybe the guava version in your spark lib folder is not compatible (if
>> your Spark version has a guava library)? In this case i propose to create a
>> fat/uber jar potentially with a shaded guava dependency.
>>
>> Am 18.12.2018 um 11:26 schrieb Mich Talebzadeh > >:
>>
>> Hi,
>>
>> I am writing a small test code in spark-shell with attached jar
>> dependencies
>>
>> spark-shell --jars
>> /home/hduser/jars/bigquery-connector-0.13.4-hadoop3.jar,/home/hduser/jars/gcs-connector-1.9.4-hadoop3.jar,/home/hduser/jars/other/guava-19.0.jar,/home/hduser/jars/google-api-client-1.4.1-beta.jar,/home/hduser/jars/google-api-client-json-1.2.3-alpha.jar,/home/hduser/jars/google-api-services-bigquery-v2-rev20181202-1.27.0.jar
>>
>>  to read an already existing table in Google BigQuery as follows:
>>
>> import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
>> import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
>> import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
>> import
>> com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
>> import
>> com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
>> import com.google.gson.JsonObject
>> import org.apache.hadoop.io.LongWritable
>> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
>> // Assumes you have a spark context (sc) -- running from spark-shell REPL.
>> // Marked as transient since configuration is not Serializable. This
>> should
>> // only be necessary in spark-shell REPL.
>> @transient
>> val conf = sc.hadoopConfiguration
>> // Input parameters.
>> val fullyQualifiedInputTableId = "axial-glow-224522.accounts.ll_18740868"
>> val projectId = conf.get("fs.gs.project.id")
>> val bucket = conf.get("fs.gs.system.bucket")
>> // Input configuration.
>> conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
>> conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
>> BigQueryConfiguration.configureBigQueryInput(conf,
>> fullyQualifiedInputTableId)
>>
>> The problem I have is that even after loading jars with spark-shell --jar
>>
>> I am getting the following error at the last line
>>
>> scala> BigQueryConfiguration.configureBigQueryInput(conf,
>> fullyQualifiedInputTableId)
>>
>> java.lang.NoSuchMethodError:
>> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
>>   at
>> com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
>>   at
>> com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(BigQueryConfiguration.java:260)
>>   ... 49 elided
>>
>> It says it cannot find method
>>
>> java.lang.NoSuchMethodError:
>> com.google.common.base.Preconditions.checkArgument
>>
>> but I checked it and it is in the following jar file
>>
>> jar tvf guava-19.0.jar| grep common.base.Preconditions
>>   5249 Wed Dec 09 15:58:14 UTC 2015
>> com/google/common/base/Preconditions.class
>>
>> I have used different version of guava jar files but none works!
>>
>> The code is based on the following:
>>
>>
>> https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>


Why is the max iteration for svd not configurable in mllib?

2018-08-10 Thread Sam Lendle
https://github.com/apache/spark/blob/f5aba657396bd4e2e03dd06491a2d169a99592a7/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L191
maxIter is set to max(300, 3 * # singular values). Is there a particular reason 
for this? And if not, would it be appropriate to submit a PR exposing that 
parameter? I have not contributed to spark before, so I don’t know if a small 
api change like that would require a discussion beforehand.

Thanks!
Sam


Re: from_json()

2017-08-28 Thread Sam Elamin
Hi jg,

Perhaps I am misunderstanding you, but if you just want to create a new
schema from a df its fairly simple, assuming you have a schema already
predefined or in a string. i.e.

val newSchema = DataType.fromJson(json_schema_string)

then all you need to do is re-create the dataframe using this new dataframe

sqlContext.createDataFrame(oldDF.rdd,newSchema)

Regards
Sam

On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin <jper...@lumeris.com> wrote:

> Is there a way to not have to specify a schema when using from_json() or
> infer the schema? When you read a JSON doc from disk, you can infer the
> schema. Should I write it to disk before (ouch)?
>
>
>
> jg
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


Re: [ANNOUNCE] Announcing Apache Spark 2.2.0

2017-07-17 Thread Sam Elamin
Well done!  This is amazing news :) Congrats and really cant wait to spread
the structured streaming love!

On Mon, Jul 17, 2017 at 5:25 PM, kant kodali  wrote:

> +1
>
> On Tue, Jul 11, 2017 at 3:56 PM, Jean Georges Perrin  wrote:
>
>> Awesome! Congrats! Can't wait!!
>>
>> jg
>>
>>
>> On Jul 11, 2017, at 18:48, Michael Armbrust 
>> wrote:
>>
>> Hi all,
>>
>> Apache Spark 2.2.0 is the third release of the Spark 2.x line. This
>> release removes the experimental tag from Structured Streaming. In
>> addition, this release focuses on usability, stability, and polish,
>> resolving over 1100 tickets.
>>
>> We'd like to thank our contributors and users for their contributions and
>> early feedback to this release. This release would not have been possible
>> without you.
>>
>> To download Spark 2.2.0, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> To view the release notes: https://spark.apache.or
>> g/releases/spark-release-2-2-0.html
>>
>> *(note: If you see any issues with the release notes, webpage or
>> published artifacts, please contact me directly off-list) *
>>
>> Michael
>>
>>
>


Re: UDAFs for sketching Dataset columns with T-Digests

2017-07-06 Thread Sam Bessalah
This is interesting and very useful.
Thanks.

On Thu, Jul 6, 2017 at 2:33 AM, Erik Erlandson  wrote:

> After my talk on T-Digests in Spark at Spark Summit East, there were some
> requests for a UDAF-based interface for working with Datasets.   I'm
> pleased to announce that I released a library for doing T-Digest sketching
> with UDAFs:
>
> https://github.com/isarn/isarn-sketches-spark
>
> This initial release provides support for Scala. Future releases will
> support PySpark bindings, and additional tools for leveraging T-Digests in
> ML pipelines.
>
> Cheers!
> Erik
>


Re: Restful API Spark Application

2017-05-12 Thread Sam Elamin
Hi Nipun

Have you checked out the job servwr

https://github.com/spark-jobserver/spark-jobserver

Regards
Sam
On Fri, 12 May 2017 at 21:00, Nipun Arora <nipunarora2...@gmail.com> wrote:

> Hi,
>
> We have written a java spark application (primarily uses spark sql). We
> want to expand this to provide our application "as a service". For this, we
> are trying to write a REST API. While a simple REST API can be easily made,
> and I can get Spark to run through the launcher. I wonder, how the spark
> context can be used by service requests, to process data.
>
> Are there any simple JAVA examples to illustrate this use-case? I am sure
> people have faced this before.
>
>
> Thanks
> Nipun
>


Re: Spark Testing Library Discussion

2017-04-29 Thread Sam Elamin
Hi lucas


Thanks for the detailed feedback, that's really useful!

I did suggest Github but my colleague asked for an email

You raise a good point with the grammar, sure I will rephrase it. I am more
than happy to merge in the PR if you send it


Th at said I know you can make BDD tests using any framework but I am a
lazy developer and would rather use the framework or library defaults to
make it easier for other devs to pick up.

The number of rows is only a start correct, we can add more tests to check
the transformed version but I was going to point that out on the future
part of the series since this one is mainly about raw extracts.


Thank you very much for the feedback and I will be sure to add it once I
have more feedback


Maybe we can create a gist of all this or even a tiny book on best
practices if people find it useful

Looking forward to the PR!

Regards
Sam





On Sat, 29 Apr 2017 at 06:36, lucas.g...@gmail.com <lucas.g...@gmail.com>
wrote:

> Awesome, thanks.
>
> Just reading your post
>
> A few observations:
> 1) You're giving out Marius's email: "I have been lucky enough to
> build this pipeline with the amazing Marius Feteanu".  A linked or
> github link might be more helpful.
>
> 2) "If you are in Pyspark world sadly Holden’s test base wont work so
> I suggest you check out Pytest and pytest-bdd.".  doesn't read well to
> me, on first read I was wondering if Spark-Test-Base wasn't available
> in python... It took me about 20 seconds to figure out that you
> probably meant it doesn't allow for direct BDD semantics.  My 2nd
> observation here is that BDD semantics can be aped in any given
> testing framework.  You just need to be flexible :)
>
> 3) You're doing a transformation (IE JSON input against a JSON
> schema).  You are testing for # of rows which is a good start.  But I
> don't think that really exercises a test against your JSON schema. I
> tend to view schema as the things that need the most rigorous testing
> (it's code after all).  IE I would want to confirm that the output
> matches the expected shape and values after being loaded against the
> schema.
>
> I saw a few minor spelling and grammatical issues as well.  I put a PR
> into your blog for them.  I won't be offended if you squish it :)
>
> I should be getting into our testing 'how-to' stuff this week.  I'll
> scrape our org specific stuff and put it up to github this week as
> well.  It'll be in python so maybe we'll get both use cases covered
> with examples :)
>
> G
>
> On 27 April 2017 at 03:46, Sam Elamin <hussam.ela...@gmail.com> wrote:
> > Hi
> >
> > @Lucas I certainly would love to write an integration testing library for
> > workflows, I have a few ideas I would love to share with others and they
> are
> > focused around Airflow since that is what we use
> >
> >
> > As promised here is the first blog post in a series of posts I hope to
> write
> > on how we build data pipelines
> >
> > Please feel free to retweet my original tweet and share because the more
> > ideas we have the better!
> >
> > Feedback is always welcome!
> >
> > Regards
> > Sam
> >
> > On Tue, Apr 25, 2017 at 10:32 PM, lucas.g...@gmail.com
> > <lucas.g...@gmail.com> wrote:
> >>
> >> Hi all, whoever (Sam I think) was going to do some work on doing a
> >> template testing pipeline.  I'd love to be involved, I have a current
> task
> >> in my day job (data engineer) to flesh out our testing how-to / best
> >> practices for Spark jobs and I think I'll be doing something very
> similar
> >> for the next week or 2.
> >>
> >> I'll scrape out what i have now in the next day or so and put it up in a
> >> gist that I can share too.
> >>
> >> G
> >>
> >> On 25 April 2017 at 13:04, Holden Karau <hol...@pigscanfly.ca> wrote:
> >>>
> >>> Urgh hangouts did something frustrating, updated link
> >>> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe
> >>>
> >>> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau <hol...@pigscanfly.ca>
> >>> wrote:
> >>>>
> >>>> The (tentative) link for those interested is
> >>>> https://hangouts.google.com/hangouts/_/oyjvcnffejcjhi6qazf3lysypue .
> >>>>
> >>>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau <hol...@pigscanfly.ca>
> >>>> wrote:
> >>>>>
> >>>>> So 14 people have said they are available on Tuesday the 25th at 1PM
> >>>>> pacific so we will do this meeting then (
> >>>>

Re: Spark Testing Library Discussion

2017-04-27 Thread Sam Elamin
Hi

@Lucas I certainly would love to write an integration testing library for
workflows, I have a few ideas I would love to share with others and they
are focused around Airflow since that is what we use


As promised here
<https://samelamin.github.io/2017/04/27/Building-A-Datapipeline-part1/> is
the first blog post in a series of posts I hope to write on how we build
data pipelines

Please feel free to retweet my original tweet
<https://twitter.com/samelamin/status/857546231492612096> and share because
the more ideas we have the better!

Feedback is always welcome!

Regards
Sam

On Tue, Apr 25, 2017 at 10:32 PM, lucas.g...@gmail.com <lucas.g...@gmail.com
> wrote:

> Hi all, whoever (Sam I think) was going to do some work on doing a
> template testing pipeline.  I'd love to be involved, I have a current task
> in my day job (data engineer) to flesh out our testing how-to / best
> practices for Spark jobs and I think I'll be doing something very similar
> for the next week or 2.
>
> I'll scrape out what i have now in the next day or so and put it up in a
> gist that I can share too.
>
> G
>
> On 25 April 2017 at 13:04, Holden Karau <hol...@pigscanfly.ca> wrote:
>
>> Urgh hangouts did something frustrating, updated link
>> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe
>>
>> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> The (tentative) link for those interested is https://hangouts.google.com
>>> /hangouts/_/oyjvcnffejcjhi6qazf3lysypue .
>>>
>>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>> So 14 people have said they are available on Tuesday the 25th at 1PM
>>>> pacific so we will do this meeting then ( https://doodle.com/poll/69y6
>>>> yab4pyf7u8bn ).
>>>>
>>>> Since hangouts tends to work ok on the Linux distro I'm running my
>>>> default is to host this as a "hangouts-on-air" unless there are alternative
>>>> ideas.
>>>>
>>>> I'll record the hangout and if it isn't terrible I'll post it for those
>>>> who weren't able to make it (and for next time I'll include more European
>>>> friendly time options - Doodle wouldn't let me update it once posted).
>>>>
>>>> On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau <hol...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> Hi Spark Users (+ Some Spark Testing Devs on BCC),
>>>>>
>>>>> Awhile back on one of the many threads about testing in Spark there
>>>>> was some interest in having a chat about the state of Spark testing and
>>>>> what people want/need.
>>>>>
>>>>> So if you are interested in joining an online (with maybe an IRL
>>>>> component if enough people are SF based) chat about Spark testing please
>>>>> fill out this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn
>>>>>
>>>>> I think reasonable topics of discussion could be:
>>>>>
>>>>> 1) What is the state of the different Spark testing libraries in the
>>>>> different core (Scala, Python, R, Java) and extended languages (C#,
>>>>> Javascript, etc.)?
>>>>> 2) How do we make these more easily discovered by users?
>>>>> 3) What are people looking for in their testing libraries that we are
>>>>> missing? (can be functionality, documentation, etc.)
>>>>> 4) Are there any examples of well tested open source Spark projects
>>>>> and where are they?
>>>>>
>>>>> If you have other topics that's awesome.
>>>>>
>>>>> To clarify this about libraries and best practices for people testing
>>>>> their Spark applications, and less about testing Spark's internals
>>>>> (although as illustrated by some of the libraries there is some strong
>>>>> overlap in what is required to make that work).
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Holden :)
>>>>>
>>>>> --
>>>>> Cell : 425-233-8271 <(425)%20233-8271>
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271 <(425)%20233-8271>
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271 <(425)%20233-8271>
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


Re: help/suggestions to setup spark cluster

2017-04-26 Thread Sam Elamin
Hi Anna

There are a variety of options for launching spark clusters. I doubt people
run spark in a. Single EC2 instance, certainly not in production I don't
think

I don't have enough information of what you are trying to do but if you are
just trying to set things up from scratch then I think you can just use EMR
which will create a cluster for you and attach a zeppelin instance as well


You can also use databricks for ease of use and very little management but
you will pay a premium for that abstraction


Regards
Sam
On Wed, 26 Apr 2017 at 22:02, anna stax <annasta...@gmail.com> wrote:

> I need to setup a spark cluster for Spark streaming and scheduled batch
> jobs and adhoc queries.
> Please give me some suggestions. Can this be done in standalone mode.
>
> Right now we have a spark cluster in standalone mode on AWS EC2 running
> spark streaming application. Can we run spark batch jobs and zeppelin on
> the same. Do we need a better resource manager like Mesos?
>
> Are there any companies or individuals that can help in setting this up?
>
> Thank you.
>
> -Anna
>


Re: How to convert Dstream of JsonObject to Dataframe in spark 2.1.0?

2017-04-24 Thread Sam Elamin
you have 2 options
1 )Clean ->Write your own parser to through each property and create a
dataset
2) Hacky but simple -> Convert to json string then read in using
spark.read.json(jsonString)

Please bear in mind the second option is expensive which is why it is hacky

I wrote my own parser here
<https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/converters/SchemaConverters.scala>
which you can use to convert between JsonObjects to StructType schemas

Regards
Sam


On Sun, Apr 23, 2017 at 7:50 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> How to convert Dstream of JsonObject to Dataframe in spark 2.1.0? That
> JsonObject is from Gson Library.
>
> Thanks!
>


Deploying Spark Applications. Best Practices And Patterns

2017-04-12 Thread Sam Elamin
Hi All,

Really useful information on this thread. We moved a bit off topic since
the initial question was how to schedule spark jobs in AWS. I do think
however that there are loads of great insights here within the community so
I have renamed the subject to "Deploying Spark Applications. Best
Practices"

I honestly think is is a great opportunity to share knowledge and views of
what best practices are and hopefully lead to people building and shipping
reliable, scalable data pipelines.

Also can I reiterate what Steve mention, can we all remember this mailing
list is aimed to help people so can we please be constructive with our
feedback :)

Here is my 2 cents:

So as someone who has come from a web development background and
transitioned into the data space, I can assure you that I completely agree
with Steve in that the need for consistent, repeatable deployments are
essential for shipping software reliably. it is the very essence of
Continuous Deployment

As a side note the difference between continuous deployment and continuous
delivery is that switch to turn the lights on. You can continuously deploy
to an environment - but it is a business decision if you want to flick that
switch to make that feature available to your customer. Basically canary
releases AKA dark releases

I have seen my fair share of "cowboy" shops where the idea of deploying
your application is a manual copy/paste by a human and that is absolutely
the worst way to be deploying things, when humans are involved in any step
of your pipeline things can and inevitably will go wrong.

This is exactly why CI tools have been brought into play, to allow
integration of all your code across teams/branches and ensure code compiles
and tests pass regardless whether this code is meant to do, whether its a
website, application, library or framework is irrelevant.

Not everyone is going to agree with this, but im my humble opinion "big
data" products are in its infancy, the traditional ETL scripts consisted of
bespoke python scripts that at most did simple transformations. There is no
right way and there certainly isn't an industry standard as of yet. there
are definitely loads of wrong ways and I am sure we have all seen/done our
fair share of "horror"" stories as Steve eloquently put it. Tools like
Talend and Pentaho came into play to try and simplify this process, the UI
just said point me to a database, click click click and you have a pipeline
in place.

When it comes to scheduling Spark jobs, you can either submit to an already
running cluster using things like Oozie or bash scripts, or have a workflow
manager like Airflow or Data Pipeline to create new clusters for you. We
went down the second route to continue with the whole immutable
infrastructure/ "treat you're servers as cattle not pets"

We are facing two problems for this at the moment

1) Testing and versioning of data in Spark applications:  We solved this by
using Holden's Spark test base which works amazingly but the nature of data
warehousing means people want ALL the columns available so you have to go
against your nature as a engineer to keep things simple, the mentality of
an analyst of a data scientist is to throw the kitchen sink in, literally
any data available should be in the end transformed table, this basically
means you either do not test the generated data or your code becomes super
verbose and coupled making a nightmare to maintain which defeats the
purpose of testing in the first place. Not to mention the nuances of the
data sources coming in, eg. data arriving in the wrong shape, wrong order
or wrong format or in some cases not at all. You need to test for all of
that and deal with it or you will get burnt in production. You do not want
to be in that call when your stake holders are asking why their reports are
not updated or worse are showing no data!

2) Testing and deploying the workflow manager: We needed to ensure
deployments were easy, we were basically masochists here, i.e. if
deployment is painful then do it more often until it isnt. The problem is
there isnt a clean way to test airflow other than running the DAGs
themselves, so we had to parameterise them to push test data through our
pipeline and ensure that the transformed tables were generated correctly
(simple s3 lookup for us). We are still early days here so happy to hear on
feedback on how to do it better


I realise this is a very very long email and would probably be better
explained on a blog post, but hey this is the gist of it. If people are
still interested I can write it up as a blog post adding code samples and
nice diagrams!

Kind Regards
Sam














On Wed, Apr 12, 2017 at 7:33 PM, lucas.g...@gmail.com <lucas.g...@gmail.com>
wrote:

> "Building data products is a very different discipline from that of
> building software."
>
> That is a fundamentally incorrect assumption.
>
> There will alwa

Re: Spark Streaming. Real-time save data and visualize on dashboard

2017-04-12 Thread Sam Elamin
Hi

To be honest there are a variety of options but it all comes down to who
will be querying these dashboards.

If the end user is an engineer then the ELK stack is fine and I can attest
to the ease of use of kibana since I used it quite heavily.

On the other hand in my experience it isnt the engineers that are in charge
of reporting so if the end user is a data analyst or data scientist then
they are most comfortable using SQL and would be slightly aversed to
learning the nuances of creating dashboards and using elastic search. Trust
me no matter how much you try, these folks are more comfortable using sql
and Tableau like platforms. So you will have to educate them, Not to
mention the fact that any new hire will have to undergo the same training
to be productive

My suggestion for that is to push your data to Google BigQuery
<https://cloud.google.com/bigquery/>. It really is simple to use and people
can just focus on writing their queries. It also returns within seconds for
queries over terabytes of data. The caveat here is that you are paying per
query. But it's $5 for 1 TB which is peanuts really. Its a managed service
so there is zero setup costs and management compared to the other services.
I suppose in the end you are paying to abstract that knowledge away

Happy to answer any questions you might have

Kind Regards
Sam




On Wed, 12 Apr 2017 at 09:36, tencas <diego...@gmail.com> wrote:

> Hi Gaurav1809 ,
>
> I was thinking about using elasticsearch + kibana too (actually don't know
> the differences between ELK and elasticsearch).
> I was wondering about pros and cons of using a document indexer vs NoSQL
> database.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Streaming-Real-time-save-data-
> and-visualize-on-dashboard-tp28587p28589.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: optimising storage and ec2 instances

2017-04-11 Thread Sam Elamin
Hi Zeming Yu, Steve

Just to add, we are also going down partitioning using this route but you
should know if you are in AWS land, you are most likely going to use EMRs
at any given time

At the moment EMRs does not do recursive search on wildcards, see this
<http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-errors-io.html#recurseinput>

However Spark seems to be able to deal with it fine, so if you dont have a
data serving layer to your customers then you should be fine

Regards
sam

On Tue, Apr 11, 2017 at 1:21 PM, Zeming Yu <zemin...@gmail.com> wrote:

> everything works best if your sources are a few tens to hundreds of MB or
> more
>
> Are you referring to the size of the zip file or individual unzipped files?
>
> Any issues with storing a 60 mb zipped file containing heaps of text files
> inside?
>
> On 11 Apr. 2017 9:09 pm, "Steve Loughran" <ste...@hortonworks.com> wrote:
>
>>
>> > On 11 Apr 2017, at 11:07, Zeming Yu <zemin...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > I'm a beginner with spark, and I'm wondering if someone could provide
>> guidance on the following 2 questions I have.
>> >
>> > Background: I have a data set growing by 6 TB p.a. I plan to use spark
>> to read in all the data, manipulate it and build a predictive model on it
>> (say GBM) I plan to store the data in S3, and use EMR to launch spark,
>> reading in data from S3.
>> >
>> > 1. Which option is best for storing the data on S3 for the purpose of
>> analysing it in EMR spark?
>> > Option A: storing the 6TB file as 173 million individual text files
>> > Option B: zipping up the above 173 million text files as 240,000 zip
>> files
>> > Option C: appending the individual text files, so have 240,000 text
>> files p.a.
>> > Option D: combining the text files even further
>> >
>>
>> everything works best if your sources are a few tens to hundreds of MB or
>> more of your data, work can be partitioned up by file. If you use more
>> structured formats (avro compressed with snappy, etc), you can throw > 1
>> executor at work inside a file. Structure is handy all round, even if its
>> just adding timestamp and provenance columns to each data file.
>>
>> there's the HAR file format from Hadoop which can merge lots of small
>> files into larger ones, allowing work to be scheduled per har file.
>> Recommended for HDFS as it hates small files, on S3 you still have limits
>> on small files (including throttling of HTTP requests to shards of a
>> bucket), but they are less significant.
>>
>> One thing to be aware is that the s3 clients spark use are very
>> inefficient in listing wide directory trees, and Spark not always the best
>> at partitioning work because of this. You can accidentally create a very
>> inefficient tree structure like datasets/year=2017/month=5/day=10/hour=12/,
>> with only one file per hour. Listing and partitioning suffers here, and
>> while s3a on Hadoop 2.8 is better here, Spark hasn't yet fully adapted to
>> those changes (use of specific API calls). There's also a lot more to be
>> done in S3A to handle wildcards in the directory tree much more efficiently
>> (HADOOP-13204); needs to address pattens like 
>> (datasets/year=201?/month=*/day=10)
>> without treewalking and without fetching too much data from wildcards near
>> the top of the tree. We need to avoid implementing something which works
>> well on *my* layouts, but absolutely dies on other people's. As is usual in
>> OSS, help welcome; early testing here as critical as coding, so as to
>> ensure things will work with your file structures
>>
>> -Steve
>>
>>
>> > 2. Any recommendations on the EMR set up to analyse the 6TB of data all
>> at once and build a GBM, in terms of
>> > 1) The type of EC2 instances I need?
>> > 2) The number of such instances I need?
>> > 3) Rough estimate of cost?
>> >
>>
>> no opinion there
>>
>> >
>> > Thanks so much,
>> > Zeming
>> >
>>
>>


Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-11 Thread Sam Elamin
Hi Steve


Thanks for the detailed response, I think this problem doesn't have an
industry standard solution as of yet and I am sure a lot of people would
benefit from the discussion

I realise now what you are saying so thanks for clarifying, that said let
me try and explain how we approached the problem

There are 2 problems you highlighted, the first if moving the code from SCM
to prod, and the other is enusiring the data your code uses is correct.
(using the latest data from prod)


*"how do you get your code from SCM into production?"*

We currently have our pipeline being run via airflow, we have our dags in
S3, with regards to how we get our code from SCM to production

1) Jenkins build that builds our spark applications and runs tests
2) Once the first build is successful we trigger another build to copy the
dags to an s3 folder

We then routinely sync this folder to the local airflow dags folder every X
amount of mins

Re test data
*" but what's your strategy for test data: that's always the troublespot."*

Our application is using versioning against the data, so we expect the
source data to be in a certain version and the output data to also be in a
certain version

We have a test resources folder that we have following the same convention
of versioning - this is the data that our application tests use - to ensure
that the data is in the correct format

so for example if we have Table X with version 1 that depends on data from
Table A and B also version 1, we run our spark application then ensure the
transformed table X has the correct columns and row values

Then when we have a new version 2 of the source data or adding a new column
in Table X (version 2), we generate a new version of the data and ensure
the tests are updated

That way we ensure any new version of the data has tests against it

*"I've never seen any good strategy there short of "throw it at a copy of
the production dataset"."*

I agree which is why we have a sample of the production data and version
the schemas we expect the source and target data to look like.

If people are interested I am happy writing a blog about it in the hopes
this helps people build more reliable pipelines

Kind Regards
Sam










On Tue, Apr 11, 2017 at 11:31 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 7 Apr 2017, at 18:40, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> Definitely agree with gourav there. I wouldn't want jenkins to run my work
> flow. Seems to me that you would only be using jenkins for its scheduling
> capabilities
>
>
> Maybe I was just looking at this differenlty
>
> Yes you can run tests but you wouldn't want it to run your orchestration
> of jobs
>
> What happens if jenkijs goes down for any particular reason. How do you
> have the conversation with your stakeholders that your pipeline is not
> working and they don't have data because the build server is going through
> an upgrade or going through an upgrade
>
>
>
> Well, I wouldn't use it as a replacement for Oozie, but I'd certainly
> consider as the pipeline for getting your code out to the cluster, so you
> don't have to explain why you just pushed out something broken
>
> As example, here's Renault's pipeline as discussed last week in Munich
> https://flic.kr/p/Tw3Emu
>
> However to be fair I understand what you are saying Steve if someone is in
> a place where you only have access to jenkins and have to go through hoops
> to setup:get access to new instances then engineers will do what they
> always do, find ways to game the system to get their work done
>
>
>
>
> This isn't about trying to "Game the system", this is about what makes a
> replicable workflow for getting code into production, either at the press
> of a button or as part of a scheduled "we push out an update every night,
> rerun the deployment tests and then switch over to the new installation"
> mech.
>
> Put differently: how do you get your code from SCM into production? Not
> just for CI, but what's your strategy for test data: that's always the
> troublespot. Random selection of rows may work, although it will skip the
> odd outlier (high-unicode char in what should be a LATIN-1 field, time set
> to 0, etc), and for work joining > 1 table, you need rows which join well.
> I've never seen any good strategy there short of "throw it at a copy of the
> production dataset".
>
>
> -Steve
>
>
>
>
>
>
> On Fri, 7 Apr 2017 at 16:17, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi Steve,
>>
>> Why would you ever do that? You are suggesting the use of a CI tool as a
>> workflow and orchestration engine.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Apr 7, 2017 at 

Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-07 Thread Sam Elamin
Definitely agree with gourav there. I wouldn't want jenkins to run my work
flow. Seems to me that you would only be using jenkins for its scheduling
capabilities

Yes you can run tests but you wouldn't want it to run your orchestration of
jobs

What happens if jenkijs goes down for any particular reason. How do you
have the conversation with your stakeholders that your pipeline is not
working and they don't have data because the build server is going through
an upgrade or going through an upgrade

However to be fair I understand what you are saying Steve if someone is in
a place where you only have access to jenkins and have to go through hoops
to setup:get access to new instances then engineers will do what they
always do, find ways to game the system to get their work done




On Fri, 7 Apr 2017 at 16:17, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Steve,
>
> Why would you ever do that? You are suggesting the use of a CI tool as a
> workflow and orchestration engine.
>
> Regards,
> Gourav Sengupta
>
> On Fri, Apr 7, 2017 at 4:07 PM, Steve Loughran <ste...@hortonworks.com>
> wrote:
>
>> If you have Jenkins set up for some CI workflow, that can do scheduled
>> builds and tests. Works well if you can do some build test before even
>> submitting it to a remote cluster
>>
>> On 7 Apr 2017, at 10:15, Sam Elamin <hussam.ela...@gmail.com> wrote:
>>
>> Hi Shyla
>>
>> You have multiple options really some of which have been already listed
>> but let me try and clarify
>>
>> Assuming you have a spark application in a jar you have a variety of
>> options
>>
>> You have to have an existing spark cluster that is either running on EMR
>> or somewhere else.
>>
>> *Super simple / hacky*
>> Cron job on EC2 that calls a simple shell script that does a spart submit
>> to a Spark Cluster OR create or add step to an EMR cluster
>>
>> *More Elegant*
>> Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that will
>> do the above step but have scheduling and potential backfilling and error
>> handling(retries,alerts etc)
>>
>> AWS are coming out with glue <https://aws.amazon.com/glue/> soon that
>> does some Spark jobs but I do not think its available worldwide just yet
>>
>> Hope I cleared things up
>>
>> Regards
>> Sam
>>
>>
>> On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Shyla,
>>>
>>> why would you want to schedule a spark job in EC2 instead of EMR?
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> I want to run a spark batch job maybe hourly on AWS EC2 .  What is the
>>>> easiest way to do this. Thanks
>>>>
>>>
>>>
>>
>>
>


Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-07 Thread Sam Elamin
Hi Shyla

You have multiple options really some of which have been already listed but
let me try and clarify

Assuming you have a spark application in a jar you have a variety of options

You have to have an existing spark cluster that is either running on EMR or
somewhere else.

*Super simple / hacky*
Cron job on EC2 that calls a simple shell script that does a spart submit
to a Spark Cluster OR create or add step to an EMR cluster

*More Elegant*
Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that will
do the above step but have scheduling and potential backfilling and error
handling(retries,alerts etc)

AWS are coming out with glue <https://aws.amazon.com/glue/> soon that does
some Spark jobs but I do not think its available worldwide just yet

Hope I cleared things up

Regards
Sam


On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Shyla,
>
> why would you want to schedule a spark job in EC2 instead of EMR?
>
> Regards,
> Gourav
>
> On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande <deshpandesh...@gmail.com>
> wrote:
>
>> I want to run a spark batch job maybe hourly on AWS EC2 .  What is the
>> easiest way to do this. Thanks
>>
>
>


Re: Executor unable to pick postgres driver in Spark standalone cluster

2017-04-04 Thread Sam Elamin
Hi Rishikesh,

Sounds like the postgres driver isnt being loaded on the path. To try and
debug it try submit the application with the --jars

e.g.

spark-submit {application.jar} --jars /home/ubuntu/downloads/
postgres/postgresql-9.4-1200-jdbc41.jar


If that does not work then there is a problem in the application itself and
the reason it is working is because you have the dependency in your class
path locally


Regards
Sam

On Mon, Apr 3, 2017 at 2:43 PM, Rishikesh Teke <rishikesht...@gmail.com>
wrote:

>
> Hi all,
>
> I was submitting the play application to spark 2.1 standalone cluster . In
> play application postgres dependency is also added and application works on
> local spark libraries. But at run time on standalone cluster it gives me
> error :
>
> o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 1, 172.31.21.3,
> executor 1): java.lang.ClassNotFoundException: org.postgresql
> .Driver
>
> I have placed following in spark-defaults.conf directory
>
> spark.executor.extraClassPath
> /home/ubuntu/downloads/postgres/postgresql-9.4-1200-jdbc41.jar
> spark.driver.extraClassPath
> /home/ubuntu/downloads/postgres/postgresql-9.4-1200-jdbc41.jar
>
> Still executors unable to pick the driver.
> Am i missing something? Need help .
> Thanks.
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Executor-unable-to-pick-postgres-driver-in-Spark-
> standalone-cluster-tp28563.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Contributing to Spark

2017-03-19 Thread Sam Elamin
Hi All,

I would like to start contributing to Spark if possible, its an amazing
technology and I would love to get involved


The contributing page <http://spark.apache.org/contributing.html> states
this "consult the list of starter tasks in JIRA, or ask the
user@spark.apache.org mailing list."


Can anyone guide me on where is best to start? What are these "starter
tasks"?

I was thinking adding tests would be a good place to begin when dealing
with any new code base, perhaps to Pyspark since Scala seems a bit more
stable


Also - if at all possible - I would really appreciate if any of the
contributors or PMC members would be willing to mentor or guide me in this.
Any help would be greatly appreciated!


Regards
Sam


Re: Spark and continuous integration

2017-03-14 Thread Sam Elamin
Thank you both

Steve that's a very interesting point. I have to admit I have never thought
of doing analysis over time on the tests but it makes sense as the failures
over time tell you quite a bit about your data platform

Thanks for highlighting! We are using Pyspark for now so I hope some
frameworks help with that.

Previously we have built data sanity checks that look at counts and numbers
to produce graphs using statsd and Grafana (elk stack) but not necessarily
looking at test metrics


I'll definitely check it out

Kind regards
Sam
On Tue, 14 Mar 2017 at 11:57, Jörn Franke <jornfra...@gmail.com> wrote:

> I agree the reporting is an important aspect. Sonarqube (or similar tool)
> can report over time, but does not support Scala (well indirectly via
> JaCoCo). In the end, you will need to think about a dashboard that displays
> results over time.
>
> On 14 Mar 2017, at 12:44, Steve Loughran <ste...@hortonworks.com> wrote:
>
>
> On 13 Mar 2017, at 13:24, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> Hi Jorn
>
> Thanks for the prompt reply, really we have 2 main concerns with CD,
> ensuring tests pasts and linting on the code.
>
>
> I'd add "providing diagnostics when tests fail", which is a combination
> of: tests providing useful information and CI tooling collecting all those
> results and presenting them meaningfully. The hard parts are invariably (at
> least for me)
>
> -what to do about the intermittent failures
> -tradeoff between thorough testing and fast testing, especially when
> thorough means "better/larger datasets"
>
> You can consider the output of jenkins & tests as data sources for your
> own analysis too: track failure rates over time, test runs over time, etc:
> could be interesting. If you want to go there, then the question of "which
> CI toolings produce the most interesting machine-parseable results, above
> and beyond the classic Ant-originated XML test run reports"
>
> I have mixed feelings about scalatest there: I think the expression
> language is good, but the maven test runner doesn't report that well, at
> least for me:
>
>
> https://steveloughran.blogspot.co.uk/2016/09/scalatest-thoughts-and-ideas.html
>
>
>
> I think all platforms should handle this with ease, I was just wondering
> what people are using.
>
> Jenkins seems to have the best spark plugins so we are investigating that
> as well as a variety of other hosted CI tools
>
> Happy to write a blog post detailing our findings and sharing it here if
> people are interested
>
>
> Regards
> Sam
>
> On Mon, Mar 13, 2017 at 1:18 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>
> Hi,
>
> Jenkins also now supports pipeline as code and multibranch pipelines. thus
> you are not so dependent on the UI and you do not need anymore a long list
> of jobs for different branches. Additionally it has a new UI (beta) called
> blueocean, which is a little bit nicer. You may also check GoCD. Aside from
> this you have a huge variety of commercial tools, e.g. Bamboo.
> In the cloud, I use for my open source github projects Travis-Ci, but
> there are also a lot of alternatives, e.g. Distelli.
>
> It really depends what you expect, e.g. If you want to Version the build
> pipeline in GIT, if you need Docker deployment etc. I am not sure if new
> starters should be responsible for the build pipeline, thus I am not sure
> that i understand  your concern in this area.
>
> From my experience, integration tests for Spark can be run on any of these
> platforms.
>
> Best regards
>
> > On 13 Mar 2017, at 10:55, Sam Elamin <hussam.ela...@gmail.com> wrote:
> >
> > Hi Folks
> >
> > This is more of a general question. What's everyone using for their CI
> /CD when it comes to spark
> >
> > We are using Pyspark but potentially looking to make to spark scala and
> Sbt in the future
> >
> >
> > One of the suggestions was jenkins but I know the UI isn't great for new
> starters so I'd rather avoid it. I've used team city but that was more
> focused on dot net development
> >
> >
> > What are people using?
> >
> > Kind Regards
> > Sam
>
>
>
>


Re: Spark and continuous integration

2017-03-13 Thread Sam Elamin
Hi Jorn

Thanks for the prompt reply, really we have 2 main concerns with CD,
ensuring tests pasts and linting on the code.

I think all platforms should handle this with ease, I was just wondering
what people are using.

Jenkins seems to have the best spark plugins so we are investigating that
as well as a variety of other hosted CI tools

Happy to write a blog post detailing our findings and sharing it here if
people are interested


Regards
Sam

On Mon, Mar 13, 2017 at 1:18 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Hi,
>
> Jenkins also now supports pipeline as code and multibranch pipelines. thus
> you are not so dependent on the UI and you do not need anymore a long list
> of jobs for different branches. Additionally it has a new UI (beta) called
> blueocean, which is a little bit nicer. You may also check GoCD. Aside from
> this you have a huge variety of commercial tools, e.g. Bamboo.
> In the cloud, I use for my open source github projects Travis-Ci, but
> there are also a lot of alternatives, e.g. Distelli.
>
> It really depends what you expect, e.g. If you want to Version the build
> pipeline in GIT, if you need Docker deployment etc. I am not sure if new
> starters should be responsible for the build pipeline, thus I am not sure
> that i understand  your concern in this area.
>
> From my experience, integration tests for Spark can be run on any of these
> platforms.
>
> Best regards
>
> > On 13 Mar 2017, at 10:55, Sam Elamin <hussam.ela...@gmail.com> wrote:
> >
> > Hi Folks
> >
> > This is more of a general question. What's everyone using for their CI
> /CD when it comes to spark
> >
> > We are using Pyspark but potentially looking to make to spark scala and
> Sbt in the future
> >
> >
> > One of the suggestions was jenkins but I know the UI isn't great for new
> starters so I'd rather avoid it. I've used team city but that was more
> focused on dot net development
> >
> >
> > What are people using?
> >
> > Kind Regards
> > Sam
>


Spark and continuous integration

2017-03-13 Thread Sam Elamin
Hi Folks

This is more of a general question. What's everyone using for their CI /CD
when it comes to spark

We are using Pyspark but potentially looking to make to spark scala and Sbt
in the future


One of the suggestions was jenkins but I know the UI isn't great for new
starters so I'd rather avoid it. I've used team city but that was more
focused on dot net development


What are people using?

Kind Regards
Sam


Re: How to unit test spark streaming?

2017-03-07 Thread Sam Elamin
Hey kant

You can use holdens spark test base

Have a look at some of the specs I wrote here to give you an idea

https://github.com/samelamin/spark-bigquery/blob/master/src/test/scala/com/samelamin/spark/bigquery/BigQuerySchemaSpecs.scala

Basically you abstract your transformations to take in a dataframe and
return one, then you assert on the returned df

Regards
Sam
On Tue, 7 Mar 2017 at 12:05, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> How to unit test spark streaming or spark in general? How do I test the
> results of my transformations? Also, more importantly don't we need to
> spawn master and worker JVM's either in one or multiple nodes?
>
> Thanks!
> kant
>


Re: using spark to load a data warehouse in real time

2017-03-01 Thread Sam Elamin
Hi Adaryl

Having come from a Web background myself I completely understand your
confusion so let me try to clarify a few things

First and foremost, Spark is a data processing engine not a general
framework. In the Web applications and frameworks world you load the
entities, map them to the UI and serve them up to the users then save
whatever you need to back to the database via some sort of entity mapping.
Whether that's an orm or a stored procedures or any other manner

Spark as I mentioned is a data processing engine so there Is no concept of
an orm or data mapper. You can give it the schema of what you expect the
data to like like, it also works well with most of the data formats being
used in the industry like CSV,JSON,AVRO and PARQUET including infering the
schema from the data provided making it much easier to develop and maintain

Now as to your question of loading data in real time it absolutely can be
done. Traditionally data coming in arrives at a location most people call
the landing. This is where the extract of the etl part begins.

As Jorn mention spark streaming isn't meant to write to a database but you
can write to kafka or kinesis to write to a pipeline then have another
process call them and write to your end datastore.

 The creators of spark realised that you're use case is absolutely valid
and almost everyone they talked to said that streaming on its own wasn't
enough, for this very same reason the concept of structured streaming was
brought in place.

Se  this blog post from databricks

https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html


You can potentially use the structured streaming APIs to continually read
changes from hdfs or in your case S3 then write it out via jdbc to your end
datastore

I have done it before so I'll give you a few gotchas to be aware of

The most important one is that your end datastore or data warehouse
supports streaming inserts, some are better than others. Redshift
specifically is really bad when it comes to small very frequent deltas
which is what streaming at high scale is

The second is that the structured streaming is still in alpha phase and the
code is marked as experimental, that's not to say it will die the minute
you push any load through because I found that it handled Gbs of data well.
The pains I found is that the underlying goal of structured streaming was
to use the underlying dataframe APIs hence unifying the batch and stream
data types meaning you only need to learn one. However some methods don't
yet work on the streaming dataframes such as dropDuplicates


That's pretty much it. So really it comes down to you're use case, if you
need the data to be reliable and never go down then implement kafka or
Kinesis. If it's a proof of concept or you are trying to validate a theory
use structured streaming as it's much quicker to write, weeks and months of
set up vs a few hours


I hope I clarified things for you

Regards
Sam

Sent from my iPhone




On Wed, 1 Mar 2017 at 07:34, Jörn Franke <jornfra...@gmail.com> wrote:

I am not sure that Spark Streaming is what you want to do. It is for
streaming analytics not for loading in a DWH.

You need also define what realtime means and what is needed there - it will
differ from client to client significantly.

>From my experience, just SQL is not enough for the users in the future.
Especially large data volumes require much more beyond just aggregations.
These may become less useful in context of large data volumes. They have to
learn new ways of dealing with the data from a business perspective by
employing proper sampling of data from a large dataset, machine learning
approaches etc. These are new methods which are not technically driven but
business driven. I think it is wrong to assume that users learning new
skills is a bad thing; it might be in the future a necessity.

On 28 Feb 2017, at 23:18, Adaryl Wakefield <adaryl.wakefi...@hotmail.com>
wrote:

I’m actually trying to come up with a generalized use case that I can take
from client to client. We have structured data coming from some
application. Instead of dropping it into Hadoop and then using yet another
technology to query that data, I just want to dump it into a relational MPP
DW so nobody has to learn new skills or new tech just to do some analysis.
Everybody and their mom can write SQL. Designing relational databases is a
rare skill but not as rare as what is necessary for designing some NoSQL
solutions.



I’m looking for the fastest path to move a company from batch to real time
analytical processing.



Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685

www.massstreet.net

www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData



*From:* Mohammad Tariq [mailto:donta...@gmail.com <donta...@gmail.com>]
*Sent:* Tuesday, February 28, 2017 12:57 PM
*To:* Adaryl Wakefield <adaryl.wakefi...@hotmail.com>
*Cc:* user@spark.apache.org
*Sub

Re: Structured Streaming: How to handle bad input

2017-02-23 Thread Sam Elamin
Hi Jayesh

So you have 2 problems here

1) Data was loaded in the wrong format
2) Once you handled the wrong data the spark job will continually retry the
failed batch

For 2 its very easy to go into the checkpoint directory and delete that
offset manually and make it seem like it never happened.

However for point 1 the issue is a little bit more trickier, if you receive
bad data then perhaps your first point of call should be a cleaning process
to ensure your data is atleast parsable, then move it to another directory
which spark streaming is looking at

It is unreasonable to have spark both do the streaming and handle bad data
for you yet remain extremely simple and easy to use

That said I personally would have a conversation with the provider of the
data


In this scenario I just ensure that these providers ensure the format of
the data is correct, whether its CSV JSON AVRO PARQUET or whatever, I
should hope whatever service/company is providing this data is providing it
"correctly" to a set definition, otherwise you will have to do a pre
cleaning step


Perhaps someone else can suggest a better/cleaner approach

Regards
Sam







On Thu, Feb 23, 2017 at 2:09 PM, JayeshLalwani <
jayesh.lalw...@capitalone.com> wrote:

> What is a good way to make a Structured Streaming application deal with bad
> input? Right now, the problem is that bad input kills the Structured
> Streaming application. This is highly undesirable, because a Structured
> Streaming application has to be always on
>
> For example, here is a very simple structured streaming program
>
>
>
>
> Now, I drop in a CSV file with the following data into my bucket
>
>
>
> Obviously the data is in the wrong format
>
> The executor and driver come crashing down
> 17/02/23 08:53:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
> java.lang.NumberFormatException: For input string: "Iron man"
> at
> java.lang.NumberFormatException.forInputString(
> NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at scala.collection.immutable.StringLike$class.toInt(
> StringLike.scala:272)
> at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(
> CSVInferSchema.scala:250)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:125)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:94)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:166)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:231)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:225)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$Ta

Re: quick question: best to use cluster mode or client mode for production?

2017-02-23 Thread Sam Elamin
I personally use spark submit as it's agnostic to which platform your spark
clusters are working on e.g. Emr dataproc databricks etc


On Thu, 23 Feb 2017 at 08:53, nancy henry  wrote:

> Hi Team,
>
> I have set of hc.sql("hivequery") kind of scripts which i am running right
> now in spark-shell
>
> How should i schedule it in production
> making it spark-shell -i script.scala
> or keeping it in jar file through eclipse and use spark-submit deploy mode
> cluster?
>
> which is advisable?
>


Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream

2017-02-19 Thread Sam Elamin
just doing a bit of research, seems weve been beaten to the punch, theres
already a connector you can use here
<https://github.com/maropu/spark-kinesis-sql-asl/issues/4>

Give it a go and feel free to give the commiter feedback or better yet send
some PRs if it needs them :)

On Sun, Feb 19, 2017 at 9:23 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Hey Neil
>
> No worries! Happy to help you write it if you want, just link me to the
> repo and we can write it together
>
> Would be fun!
>
>
> Regards
> Sam
> On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <neil.v.maheshw...@gmail.com>
> wrote:
>
>> Thanks for the advice Sam. I will look into implementing a structured
>> streaming connector.
>>
>> On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote:
>>
>> HI Niel,
>>
>> My advice would be to write a structured streaming connector. The new
>> structured streaming APIs were brought in to handle exactly the issues you
>> describe
>>
>> See this blog
>> <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html>
>>
>> There isnt a structured streaming connector as of yet, but you can easily
>> write one that uses the underlying batch methods to read/write to Kinesis
>>
>> Have a look at how I wrote my bigquery connector here
>> <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we
>> get a new connector to a highly used datasource/sink
>>
>> Hope that helps
>>
>> Regards
>> Sam
>>
>> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
>> neil.v.maheshw...@gmail.com> wrote:
>>
>> Thanks for your response Ayan.
>>
>> This could be an option. One complication I see with that approach is
>> that I do not want to miss any records that are between the data we have
>> batched to the data store and the checkpoint. I would still need a
>> mechanism for recording the sequence number of the last time the data was
>> batched, so I could start the streaming application after that sequence
>> number.
>>
>> A similar approach could be to batch our data periodically, recording the
>> last sequence number of the batch. Then, fetch data from Kinesis using the
>> low level API to read data from the latest sequence number of the batched
>> data up until the sequence number of the latest checkpoint from our spark
>> app. I could merge batched dataset and the dataset fetched from Kinesis’s
>> lower level API, and use that dataset as an RDD to prep the job.
>>
>> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>> Hi
>>
>> AFAIK, Kinesis does not provide any mechanism other than check point to
>> restart. That makes sense as it makes it so generic.
>>
>> Question: why cant you warm up your data from a data store? Say every 30
>> mins you run a job to aggregate your data to a data store for that hour.
>> When you restart the streaming app it would read from dynamo check point,
>> but it would also preps an initial rdd from data store?
>>
>> Best
>> Ayan
>> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
>> neil.v.maheshw...@gmail.com> wrote:
>>
>> Hello,
>>
>> I am building a Spark streaming application that ingests data from an
>> Amazon Kinesis stream. My application keeps track of the minimum price over
>> a window for groups of similar tickets. When I deploy the application, I
>> would like it to start processing at the start of the previous hours data.
>> This will warm up the state of the application and allow us to deploy our
>> application faster. For example, if I start the application at 3 PM, I
>> would like to process the data retained by Kinesis from 2PM to 3PM, and
>> then continue receiving data going forward. Spark Streaming’s Kinesis
>> receiver, which relies on the Amazon Kinesis Client Library, seems to give
>> me three options for choosing where to read from the stream:
>>
>>- read from the latest checkpointed sequence number in Dynamo
>>- start from the oldest record in the stream (TRIM_HORIZON shard
>>iterator type)
>>- start from the most recent record in the stream (LATEST shard
>>iterator type)
>>
>>
>> Do you have any suggestions on how we could start our application at a
>> specific timestamp or sequence number in the Kinesis stream? Some ideas I
>> had were:
>>
>>- Create a KCL application that fetches the previous hour data and
>>writes it to HDFS. We can create an RDD from 

Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream

2017-02-19 Thread Sam Elamin
Hey Neil

No worries! Happy to help you write it if you want, just link me to the
repo and we can write it together

Would be fun!


Regards
Sam
On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <neil.v.maheshw...@gmail.com>
wrote:

> Thanks for the advice Sam. I will look into implementing a structured
> streaming connector.
>
> On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> HI Niel,
>
> My advice would be to write a structured streaming connector. The new
> structured streaming APIs were brought in to handle exactly the issues you
> describe
>
> See this blog
> <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html>
>
> There isnt a structured streaming connector as of yet, but you can easily
> write one that uses the underlying batch methods to read/write to Kinesis
>
> Have a look at how I wrote my bigquery connector here
> <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we
> get a new connector to a highly used datasource/sink
>
> Hope that helps
>
> Regards
> Sam
>
> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
> Thanks for your response Ayan.
>
> This could be an option. One complication I see with that approach is that
> I do not want to miss any records that are between the data we have batched
> to the data store and the checkpoint. I would still need a mechanism for
> recording the sequence number of the last time the data was batched, so I
> could start the streaming application after that sequence number.
>
> A similar approach could be to batch our data periodically, recording the
> last sequence number of the batch. Then, fetch data from Kinesis using the
> low level API to read data from the latest sequence number of the batched
> data up until the sequence number of the latest checkpoint from our spark
> app. I could merge batched dataset and the dataset fetched from Kinesis’s
> lower level API, and use that dataset as an RDD to prep the job.
>
> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> Hi
>
> AFAIK, Kinesis does not provide any mechanism other than check point to
> restart. That makes sense as it makes it so generic.
>
> Question: why cant you warm up your data from a data store? Say every 30
> mins you run a job to aggregate your data to a data store for that hour.
> When you restart the streaming app it would read from dynamo check point,
> but it would also preps an initial rdd from data store?
>
> Best
> Ayan
> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
> Hello,
>
> I am building a Spark streaming application that ingests data from an
> Amazon Kinesis stream. My application keeps track of the minimum price over
> a window for groups of similar tickets. When I deploy the application, I
> would like it to start processing at the start of the previous hours data.
> This will warm up the state of the application and allow us to deploy our
> application faster. For example, if I start the application at 3 PM, I
> would like to process the data retained by Kinesis from 2PM to 3PM, and
> then continue receiving data going forward. Spark Streaming’s Kinesis
> receiver, which relies on the Amazon Kinesis Client Library, seems to give
> me three options for choosing where to read from the stream:
>
>- read from the latest checkpointed sequence number in Dynamo
>- start from the oldest record in the stream (TRIM_HORIZON shard
>iterator type)
>- start from the most recent record in the stream (LATEST shard
>iterator type)
>
>
> Do you have any suggestions on how we could start our application at a
> specific timestamp or sequence number in the Kinesis stream? Some ideas I
> had were:
>
>- Create a KCL application that fetches the previous hour data and
>writes it to HDFS. We can create an RDD from that dataset and initialize
>our Spark Streaming job with it. The spark streaming job’s Kinesis receiver
>can have the same name as the initial KCL application, and use that
>applications checkpoint as the starting point. We’re writing our spark jobs
>in Python, so this would require launching the java MultiLang daemon, or
>writing that portion of the application in Java/Scala.
>- Before the Spark streaming application starts, we could fetch a
>shard iterator using the AT_TIMESTAMP shard iterator type. We could record
>the sequence number of the first record returned by this iterator, and
>create an entry in Dynamo for our application for that sequence number. Our
>Kinesis receiver would pick up from this checkpoint. It makes me a little
>nervous that we would be faking Kinesis Client Library's protocol by
>writing a checkpoint into Dynamo
>
>
> Thanks in advance!
>
> Neil
>
> --
> Best Regards,
> Ayan Guha
>
>
>
>


Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream

2017-02-19 Thread Sam Elamin
HI Niel,

My advice would be to write a structured streaming connector. The new
structured streaming APIs were brought in to handle exactly the issues you
describe

See this blog
<https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html>

There isnt a structured streaming connector as of yet, but you can easily
write one that uses the underlying batch methods to read/write to Kinesis

Have a look at how I wrote my bigquery connector here
<http://github.com/samelamin/spark-bigquery>. Plus the best thing is we get
a new connector to a highly used datasource/sink

Hope that helps

Regards
Sam

On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
neil.v.maheshw...@gmail.com> wrote:

> Thanks for your response Ayan.
>
> This could be an option. One complication I see with that approach is that
> I do not want to miss any records that are between the data we have batched
> to the data store and the checkpoint. I would still need a mechanism for
> recording the sequence number of the last time the data was batched, so I
> could start the streaming application after that sequence number.
>
> A similar approach could be to batch our data periodically, recording the
> last sequence number of the batch. Then, fetch data from Kinesis using the
> low level API to read data from the latest sequence number of the batched
> data up until the sequence number of the latest checkpoint from our spark
> app. I could merge batched dataset and the dataset fetched from Kinesis’s
> lower level API, and use that dataset as an RDD to prep the job.
>
> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> Hi
>
> AFAIK, Kinesis does not provide any mechanism other than check point to
> restart. That makes sense as it makes it so generic.
>
> Question: why cant you warm up your data from a data store? Say every 30
> mins you run a job to aggregate your data to a data store for that hour.
> When you restart the streaming app it would read from dynamo check point,
> but it would also preps an initial rdd from data store?
>
> Best
> Ayan
> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
>> Hello,
>>
>> I am building a Spark streaming application that ingests data from an
>> Amazon Kinesis stream. My application keeps track of the minimum price over
>> a window for groups of similar tickets. When I deploy the application, I
>> would like it to start processing at the start of the previous hours data.
>> This will warm up the state of the application and allow us to deploy our
>> application faster. For example, if I start the application at 3 PM, I
>> would like to process the data retained by Kinesis from 2PM to 3PM, and
>> then continue receiving data going forward. Spark Streaming’s Kinesis
>> receiver, which relies on the Amazon Kinesis Client Library, seems to give
>> me three options for choosing where to read from the stream:
>>
>>- read from the latest checkpointed sequence number in Dynamo
>>- start from the oldest record in the stream (TRIM_HORIZON shard
>>iterator type)
>>- start from the most recent record in the stream (LATEST shard
>>iterator type)
>>
>>
>> Do you have any suggestions on how we could start our application at a
>> specific timestamp or sequence number in the Kinesis stream? Some ideas I
>> had were:
>>
>>- Create a KCL application that fetches the previous hour data and
>>writes it to HDFS. We can create an RDD from that dataset and initialize
>>our Spark Streaming job with it. The spark streaming job’s Kinesis 
>> receiver
>>can have the same name as the initial KCL application, and use that
>>applications checkpoint as the starting point. We’re writing our spark 
>> jobs
>>in Python, so this would require launching the java MultiLang daemon, or
>>writing that portion of the application in Java/Scala.
>>- Before the Spark streaming application starts, we could fetch a
>>shard iterator using the AT_TIMESTAMP shard iterator type. We could record
>>the sequence number of the first record returned by this iterator, and
>>create an entry in Dynamo for our application for that sequence number. 
>> Our
>>Kinesis receiver would pick up from this checkpoint. It makes me a little
>>nervous that we would be faking Kinesis Client Library's protocol by
>>writing a checkpoint into Dynamo
>>
>>
>> Thanks in advance!
>>
>> Neil
>>
> --
> Best Regards,
> Ayan Guha
>
>
>


Re: Debugging Spark application

2017-02-16 Thread Sam Elamin
I recommend running spark in local mode when your first debugging your code
just to understand what's happening and step through it, perhaps catch a
few errors when you first start off

I personally use intellij because it's my preference You can follow this
guide.
http://www.bigendiandata.com/2016-08-26-How-to-debug-remote-spark-jobs-with-IntelliJ/

Although it's for intellij you can apply the same concepts to eclipse *I
think*


Regards
Sam


On Thu, 16 Feb 2017 at 22:00, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi,
>
> I was looking for some URLs/documents for getting started on debugging
> Spark applications.
>
> I prefer developing Spark applications with Scala on Eclipse and then
> package the application jar before submitting.
>
>
>
> Kind regards,
> Reza
>
>
>
>


Re: Enrichment with static tables

2017-02-15 Thread Sam Elamin
You can do a join or a union to combine all the dataframes to one fat
dataframe

or do a select on the columns you want to produce your transformed dataframe

Not sure if I understand the question though, If the goal is just an end
state transformed dataframe that can easily be done


Regards
Sam

On Wed, Feb 15, 2017 at 6:34 PM, Gaurav Agarwal <gaurav130...@gmail.com>
wrote:

> Hello
>
> We want to enrich our spark RDD loaded with multiple Columns and multiple
> Rows . This need to be enriched with 3 different tables that i loaded 3
> different spark dataframe . Can we write some logic in spark so i can
> enrich my spark RDD with different stattic tables.
>
> Thanks
>
>


Re: Dealing with missing columns in SPARK SQL in JSON

2017-02-14 Thread Sam Elamin
ah if thats the case then you might need to define the schema before hand.
Either that or if you want to infer it then ensure a jsonfile exists with
the right schema so spark infers the right columns

essentially making both files one dataframe if that makes sense

On Tue, Feb 14, 2017 at 3:04 PM, Aseem Bansal <asmbans...@gmail.com> wrote:

> Sorry if I trivialized the example. It is the same kind of file and
> sometimes it could have "a", sometimes "b", sometimes both. I just don't
> know. That is what I meant by missing columns.
>
> It would be good if I read any of the JSON and if I do spark sql and it
> gave me
>
> for json1.json
>
> a | b
> 1 | null
>
> for json2.json
>
> a | b
> null | 2
>
>
> On Tue, Feb 14, 2017 at 8:13 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> I may be missing something super obvious here but can't you combine them
>> into a single dataframe. Left join perhaps?
>>
>> Try writing it in sql " select a from json1 and b from josn2"then run
>> explain to give you a hint to how to do it in code
>>
>> Regards
>> Sam
>> On Tue, 14 Feb 2017 at 14:30, Aseem Bansal <asmbans...@gmail.com> wrote:
>>
>>> Say I have two files containing single rows
>>>
>>> json1.json
>>>
>>> {"a": 1}
>>>
>>> json2.json
>>>
>>> {"b": 2}
>>>
>>> I read in this json file using spark's API into a dataframe one at a
>>> time. So I have
>>>
>>> Dataset json1DF
>>> and
>>> Dataset json2DF
>>>
>>> If I run "select a, b from __THIS__" in a SQLTransformer then I will get
>>> an exception as for json1DF does not have "b" and json2DF does not have "a"
>>>
>>> How could I handle this situation with missing columns in JSON?
>>>
>>
>


Re: Dealing with missing columns in SPARK SQL in JSON

2017-02-14 Thread Sam Elamin
I may be missing something super obvious here but can't you combine them
into a single dataframe. Left join perhaps?

Try writing it in sql " select a from json1 and b from josn2"then run
explain to give you a hint to how to do it in code

Regards
Sam
On Tue, 14 Feb 2017 at 14:30, Aseem Bansal <asmbans...@gmail.com> wrote:

> Say I have two files containing single rows
>
> json1.json
>
> {"a": 1}
>
> json2.json
>
> {"b": 2}
>
> I read in this json file using spark's API into a dataframe one at a time.
> So I have
>
> Dataset json1DF
> and
> Dataset json2DF
>
> If I run "select a, b from __THIS__" in a SQLTransformer then I will get
> an exception as for json1DF does not have "b" and json2DF does not have "a"
>
> How could I handle this situation with missing columns in JSON?
>


Re: how to fix the order of data

2017-02-14 Thread Sam Elamin
Its because you are just printing on the rdd

You can sort the df like below

 input.toDF().sort().collect()


or if you do not want to convert to a dataframe you can use the sort by
*sortByKey*([*ascending*], [*numTasks*])


Regards

Sam





On Tue, Feb 14, 2017 at 11:41 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

>HI  all,
> the belowing is my test code. I found the output of val
> input is different. how do i fix the order please?
>
> scala> val input = sc.parallelize( Array(1,2,3))
> input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at
> parallelize at :24
>
> scala> input.foreach(print)
> 132
> scala> input.foreach(print)
> 213
> scala> input.foreach(print)
> 312


Re: Etl with spark

2017-02-12 Thread Sam Elamin
Yup I ended up doing just that thank you both
On Sun, 12 Feb 2017 at 18:33, Miguel Morales <therevolti...@gmail.com>
wrote:

> You can parallelize the collection of s3 keys and then pass that to your
> map function so that files are read in parallel.
>
> Sent from my iPhone
>
> On Feb 12, 2017, at 9:41 AM, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> thanks Ayan but i was hoping to remove the dependency on a file and just
> use in memory list or dictionary
>
> So from the reading I've done today it seems.the concept of a bespoke
> async method doesn't really apply in spsrk since the cluster deals with
> distributing the work load
>
>
> Am I mistaken?
>
> Regards
> Sam
> On Sun, 12 Feb 2017 at 12:13, ayan guha <guha.a...@gmail.com> wrote:
>
> You can store the list of keys (I believe you use them in source file
> path, right?) in a file, one key per line. Then you can read the file using
> sc.textFile (So you will get a RDD of file paths) and then apply your
> function as a map.
>
> r = sc.textFile(list_file).map(your_function)
>
> HTH
>
> On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Hey folks
>
> Really simple question here. I currently have an etl pipeline that reads
> from s3 and saves the data to an endstore
>
>
> I have to read from a list of keys in s3 but I am doing a raw extract then
> saving. Only some of the extracts have a simple transformation but overall
> the code looks the same
>
>
> I abstracted away this logic into a method that takes in an s3 path does
> the common transformations and saves to source
>
>
> But the job takes about 10 mins or so because I'm iteratively going down a
> list of keys
>
> Is it possible to asynchronously do this?
>
> FYI I'm using spark.read.json to read from s3 because it infers my schema
>
> Regards
> Sam
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


Re: Etl with spark

2017-02-12 Thread Sam Elamin
thanks Ayan but i was hoping to remove the dependency on a file and just
use in memory list or dictionary

So from the reading I've done today it seems.the concept of a bespoke async
method doesn't really apply in spsrk since the cluster deals with
distributing the work load


Am I mistaken?

Regards
Sam
On Sun, 12 Feb 2017 at 12:13, ayan guha <guha.a...@gmail.com> wrote:

You can store the list of keys (I believe you use them in source file path,
right?) in a file, one key per line. Then you can read the file using
sc.textFile (So you will get a RDD of file paths) and then apply your
function as a map.

r = sc.textFile(list_file).map(your_function)

HTH

On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin <hussam.ela...@gmail.com>
wrote:

Hey folks

Really simple question here. I currently have an etl pipeline that reads
from s3 and saves the data to an endstore


I have to read from a list of keys in s3 but I am doing a raw extract then
saving. Only some of the extracts have a simple transformation but overall
the code looks the same


I abstracted away this logic into a method that takes in an s3 path does
the common transformations and saves to source


But the job takes about 10 mins or so because I'm iteratively going down a
list of keys

Is it possible to asynchronously do this?

FYI I'm using spark.read.json to read from s3 because it infers my schema

Regards
Sam




-- 
Best Regards,
Ayan Guha


Etl with spark

2017-02-12 Thread Sam Elamin
Hey folks

Really simple question here. I currently have an etl pipeline that reads
from s3 and saves the data to an endstore


I have to read from a list of keys in s3 but I am doing a raw extract then
saving. Only some of the extracts have a simple transformation but overall
the code looks the same


I abstracted away this logic into a method that takes in an s3 path does
the common transformations and saves to source


But the job takes about 10 mins or so because I'm iteratively going down a
list of keys

Is it possible to asynchronously do this?

FYI I'm using spark.read.json to read from s3 because it infers my schema

Regards
Sam


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-11 Thread Sam Elamin
Here's a link to the thread

http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-Dropping-Duplicates-td20884.html
On Sat, 11 Feb 2017 at 08:47, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Hey Egor
>
>
> You can use for each writer or you can write a custom sink. I personally
> went with a custom sink since I get a dataframe per batch
>
>
> https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala
>
> You can have a look at how I implemented something similar to file sink
> that in the event if a failure skips batches already written
>
>
> Also have a look at Micheals reply to me a few days ago on exactly the
> same topic. The email subject was called structured streaming. Dropping
> duplicates
>
>
> Regards
>
> Sam
>
> On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski <ja...@japila.pl> wrote:
>
> "Something like that" I've never tried it out myself so I'm only
> guessing having a brief look at the API.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov <pahomov.e...@gmail.com>
> wrote:
> > Jacek, so I create cache in ForeachWriter, in all "process()" I write to
> it
> > and on close I flush? Something like that?
> >
> > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
> >>
> >> Hi,
> >>
> >> Yes, that's ForeachWriter.
> >>
> >> Yes, it works with element by element. You're looking for mapPartition
> >> and ForeachWriter has partitionId that you could use to implement a
> >> similar thing.
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >>
> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov <pahomov.e...@gmail.com>
> >> wrote:
> >> > Jacek, you mean
> >> >
> >> >
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter
> >> > ? I do not understand how to use it, since it passes every value
> >> > separately,
> >> > not every partition. And addding to table value by value would not
> work
> >> >
> >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
> >> >>
> >> >> Hi,
> >> >>
> >> >> Have you considered foreach sink?
> >> >>
> >> >> Jacek
> >> >>
> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" <pahomov.e...@gmail.com>
> wrote:
> >> >>>
> >> >>> Hi, I'm thinking of using Structured Streaming instead of old
> >> >>> streaming,
> >> >>> but I need to be able to save results to Hive table. Documentation
> for
> >> >>> file
> >> >>> sink
> >> >>>
> >> >>> says(
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
> ):
> >> >>> "Supports writes to partitioned tables. ". But being able to write
> to
> >> >>> partitioned directories is not enough to write to the table: someone
> >> >>> needs
> >> >>> to write to Hive metastore. How can I use Structured Streaming and
> >> >>> write to
> >> >>> Hive table?
> >> >>>
> >> >>> --
> >> >>> Sincerely yours
> >> >>> Egor Pakhomov
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Sincerely yours
> >> > Egor Pakhomov
> >
> >
> >
> >
> > --
> > Sincerely yours
> > Egor Pakhomov
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-11 Thread Sam Elamin
Hey Egor


You can use for each writer or you can write a custom sink. I personally
went with a custom sink since I get a dataframe per batch

https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala

You can have a look at how I implemented something similar to file sink
that in the event if a failure skips batches already written


Also have a look at Micheals reply to me a few days ago on exactly the same
topic. The email subject was called structured streaming. Dropping
duplicates


Regards

Sam

On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski <ja...@japila.pl> wrote:

"Something like that" I've never tried it out myself so I'm only
guessing having a brief look at the API.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov <pahomov.e...@gmail.com>
wrote:
> Jacek, so I create cache in ForeachWriter, in all "process()" I write to
it
> and on close I flush? Something like that?
>
> 2017-02-09 12:42 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
>>
>> Hi,
>>
>> Yes, that's ForeachWriter.
>>
>> Yes, it works with element by element. You're looking for mapPartition
>> and ForeachWriter has partitionId that you could use to implement a
>> similar thing.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov <pahomov.e...@gmail.com>
>> wrote:
>> > Jacek, you mean
>> >
>> >
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter
>> > ? I do not understand how to use it, since it passes every value
>> > separately,
>> > not every partition. And addding to table value by value would not work
>> >
>> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
>> >>
>> >> Hi,
>> >>
>> >> Have you considered foreach sink?
>> >>
>> >> Jacek
>> >>
>> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" <pahomov.e...@gmail.com>
wrote:
>> >>>
>> >>> Hi, I'm thinking of using Structured Streaming instead of old
>> >>> streaming,
>> >>> but I need to be able to save results to Hive table. Documentation
for
>> >>> file
>> >>> sink
>> >>>
>> >>> says(
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
):
>> >>> "Supports writes to partitioned tables. ". But being able to write to
>> >>> partitioned directories is not enough to write to the table: someone
>> >>> needs
>> >>> to write to Hive metastore. How can I use Structured Streaming and
>> >>> write to
>> >>> Hive table?
>> >>>
>> >>> --
>> >>> Sincerely yours
>> >>> Egor Pakhomov
>> >
>> >
>> >
>> >
>> > --
>> > Sincerely yours
>> > Egor Pakhomov
>
>
>
>
> --
> Sincerely yours
> Egor Pakhomov

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Structured Streaming. S3 To Google BigQuery

2017-02-08 Thread Sam Elamin
Hi All

Thank you all for the amazing support! I have written a BigQuery connector
for structured streaming that you can find here
<https://github.com/samelamin/spark-bigquery>

I just tweeted <https://twitter.com/samelamin/status/829477884024782852>
about it and would really appreciated it if you retweeted when you get a
chance

The more people know about it and use it the more feedback I can get to
make the connector better!

Ofcourse PRs and feedback are always welcome :)

Thanks again!

Regards
Sam


Re: specifing schema on dataframe

2017-02-06 Thread Sam Elamin
Ah ok


Thanks for clearing it up Ayan! i will give that a go



Thank you all for your help, this mailing list is awesome!

On Mon, Feb 6, 2017 at 9:07 AM, ayan guha <guha.a...@gmail.com> wrote:

> If I am not missing anything here, "So I know which columns are numeric
> and which arent because I have a StructType and all the internal
> StructFields will tell me which ones have a DataType which is numeric and
> which arent" will lead to getting to a list of fields which should be
> numeric.
>
> Essentially, You will create a list of numeric fields from your
> "should-be" struct type. Then you will load your raw data using built-in
> json reader. At this point, your data have a wrong schema. Now, you will
> need to correct it. How? You will loop over the list of numeric fields (or,
> you can do it directly on the struct type), and try to match the type. If
> you find a mismatch, you'd add a withColumn clause to cast to the correct
> data type (from your "should-be" struct).
>
> HTH?
>
> Best
> Ayan
>
> On Mon, Feb 6, 2017 at 8:00 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> Yup sorry I should have explained myself better
>>
>> So I know which columns are numeric and which arent because I have a
>> StructType and all the internal StructFields will tell me which ones have a
>> DataType which is numeric and which arent
>>
>> So assuming I have a json string which has double quotes on numbers when
>> it shouldnt, and I have the correct schema in a struct type
>>
>>
>> how can I iterate over them to programatically create the new dataframe
>> in the correct format
>>
>> do i iterate over the columns in the StructType? or iterate over the
>> columns in the dataframe and try to match them with the StructType?
>>
>> I hope I cleared things up, What I wouldnt do for a drawing board right
>> now!
>>
>>
>> On Mon, Feb 6, 2017 at 8:56 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> UmmI think the premise is you need to "know" beforehand which
>>> columns are numeric.Unless you know it, how would you apply the schema?
>>>
>>> On Mon, Feb 6, 2017 at 7:54 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>>> Thanks ayan but I meant how to derive the list automatically
>>>>
>>>> In your example you are specifying the numeric columns and I would like
>>>> it to be applied to any schema if that makes sense
>>>> On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> SImple (pyspark) example:
>>>>>
>>>>> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json")
>>>>> >>> df.printSchema()
>>>>> root
>>>>>  |-- customerid: string (nullable = true)
>>>>>  |-- foo: string (nullable = true)
>>>>>
>>>>> >>> numeric_field_list = ['customerid']
>>>>>
>>>>> >>> for k in numeric_field_list:
>>>>> ... df = df.withColumn(k,df[k].cast("long"))
>>>>> ...
>>>>> >>> df.printSchema()
>>>>> root
>>>>>  |-- customerid: long (nullable = true)
>>>>>  |-- foo: string (nullable = true)
>>>>>
>>>>>
>>>>> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Ok thanks Micheal!
>>>>>
>>>>>
>>>>> Can I get an idea on where to start? Assuming I have the end schema
>>>>> and the current dataframe...
>>>>> How can I loop through it and create a new dataframe using the
>>>>> WithColumn?
>>>>>
>>>>>
>>>>> Am I iterating through the dataframe or the schema?
>>>>>
>>>>> I'm assuming it's easier to iterate through the columns in the old df.
>>>>> For each column cast it correctly and generate a new df?
>>>>>
>>>>>
>>>>> Would you recommend that?
>>>>>
>>>>> Regards
>>>>> Sam
>>>>> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
>>>>> wrote:
>>>>>
>>>>> If you already have the expected schema, and you know that all numbers
>>>>> will always be formatted as strings in the input JSON, you could probably

Re: specifing schema on dataframe

2017-02-06 Thread Sam Elamin
Yup sorry I should have explained myself better

So I know which columns are numeric and which arent because I have a
StructType and all the internal StructFields will tell me which ones have a
DataType which is numeric and which arent

So assuming I have a json string which has double quotes on numbers when it
shouldnt, and I have the correct schema in a struct type


how can I iterate over them to programatically create the new dataframe in
the correct format

do i iterate over the columns in the StructType? or iterate over the
columns in the dataframe and try to match them with the StructType?

I hope I cleared things up, What I wouldnt do for a drawing board right
now!


On Mon, Feb 6, 2017 at 8:56 AM, ayan guha <guha.a...@gmail.com> wrote:

> UmmI think the premise is you need to "know" beforehand which columns
> are numeric.Unless you know it, how would you apply the schema?
>
> On Mon, Feb 6, 2017 at 7:54 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> Thanks ayan but I meant how to derive the list automatically
>>
>> In your example you are specifying the numeric columns and I would like
>> it to be applied to any schema if that makes sense
>> On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> SImple (pyspark) example:
>>>
>>> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json")
>>> >>> df.printSchema()
>>> root
>>>  |-- customerid: string (nullable = true)
>>>  |-- foo: string (nullable = true)
>>>
>>> >>> numeric_field_list = ['customerid']
>>>
>>> >>> for k in numeric_field_list:
>>> ... df = df.withColumn(k,df[k].cast("long"))
>>> ...
>>> >>> df.printSchema()
>>> root
>>>  |-- customerid: long (nullable = true)
>>>  |-- foo: string (nullable = true)
>>>
>>>
>>> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> Ok thanks Micheal!
>>>
>>>
>>> Can I get an idea on where to start? Assuming I have the end schema and
>>> the current dataframe...
>>> How can I loop through it and create a new dataframe using the
>>> WithColumn?
>>>
>>>
>>> Am I iterating through the dataframe or the schema?
>>>
>>> I'm assuming it's easier to iterate through the columns in the old df.
>>> For each column cast it correctly and generate a new df?
>>>
>>>
>>> Would you recommend that?
>>>
>>> Regards
>>> Sam
>>> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
>>> wrote:
>>>
>>> If you already have the expected schema, and you know that all numbers
>>> will always be formatted as strings in the input JSON, you could probably
>>> derive this list automatically.
>>>
>>> Wouldn't it be simpler to just regex replace the numbers to remove the
>>> quotes?
>>>
>>>
>>> I think this is likely to be a slower and less robust solution.  You
>>> would have to make sure that you got all the corner cases right (i.e.
>>> escaping and what not).
>>>
>>> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> I see so for the connector I need to pass in an array/list of numerical
>>> columns?
>>>
>>> Wouldnt it be simpler to just regex replace the numbers to remove the
>>> quotes?
>>>
>>>
>>> Regards
>>> Sam
>>>
>>> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>> Specifying the schema when parsing JSON will only let you pick between
>>> similar datatypes (i.e should this be a short, long float, double etc).  It
>>> will not let you perform conversions like string <-> number.  This has to
>>> be done with explicit casts after the data has been loaded.
>>>
>>> I think you can make a solution that uses select or withColumn generic.
>>> Just load the dataframe with a "parse schema" that treats numbers as
>>> strings.  Then construct a list of columns that should be numbers and apply
>>> the necessary conversions.
>>>
>>> import org.apache.spark.sql.functions.col
>>> var df = spark.read.schema(parseSchema).json("...")
>>> numericColumns.foreach { columnName =>
>>>   df = df.withColumn(

Re: specifing schema on dataframe

2017-02-06 Thread Sam Elamin
Thanks ayan but I meant how to derive the list automatically

In your example you are specifying the numeric columns and I would like it
to be applied to any schema if that makes sense
On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote:

> SImple (pyspark) example:
>
> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json")
> >>> df.printSchema()
> root
>  |-- customerid: string (nullable = true)
>  |-- foo: string (nullable = true)
>
> >>> numeric_field_list = ['customerid']
>
> >>> for k in numeric_field_list:
> ... df = df.withColumn(k,df[k].cast("long"))
> ...
> >>> df.printSchema()
> root
>  |-- customerid: long (nullable = true)
>  |-- foo: string (nullable = true)
>
>
> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Ok thanks Micheal!
>
>
> Can I get an idea on where to start? Assuming I have the end schema and
> the current dataframe...
> How can I loop through it and create a new dataframe using the WithColumn?
>
>
> Am I iterating through the dataframe or the schema?
>
> I'm assuming it's easier to iterate through the columns in the old df. For
> each column cast it correctly and generate a new df?
>
>
> Would you recommend that?
>
> Regards
> Sam
> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> If you already have the expected schema, and you know that all numbers
> will always be formatted as strings in the input JSON, you could probably
> derive this list automatically.
>
> Wouldn't it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> I think this is likely to be a slower and less robust solution.  You would
> have to make sure that you got all the corner cases right (i.e. escaping
> and what not).
>
> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> I see so for the connector I need to pass in an array/list of numerical
> columns?
>
> Wouldnt it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> Regards
> Sam
>
> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> Specifying the schema when parsing JSON will only let you pick between
> similar datatypes (i.e should this be a short, long float, double etc).  It
> will not let you perform conversions like string <-> number.  This has to
> be done with explicit casts after the data has been loaded.
>
> I think you can make a solution that uses select or withColumn generic.
> Just load the dataframe with a "parse schema" that treats numbers as
> strings.  Then construct a list of columns that should be numbers and apply
> the necessary conversions.
>
> import org.apache.spark.sql.functions.col
> var df = spark.read.schema(parseSchema).json("...")
> numericColumns.foreach { columnName =>
>   df = df.withColumn(columnName, col(columnName).cast("long"))
> }
>
>
>
> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Thanks Micheal
>
> I've been spending the past few days researching this
>
> The problem is the generated json has double quotes on fields that are
> numbers because the producing datastore doesn't want to lose precision
>
> I can change the data type true but that would be on specific to a job
> rather than a generic streaming job. I'm writing a structured streaming
> connector and I have the schema the generated dataframe should match.
>
> Unfortunately using withColumn won't help me here since the solution needs
> to be generic
>
> To summarise assume I have the following json
>
> [{
> "customerid": "535137",
> "foo": "bar"
> }]
>
>
> and I know the schema should be:
>
> StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true)))
>
> Whats the best way of solving this?
>
> My current approach is to iterate over the JSON and identify which fields
> are numbers and which arent then recreate the json
>
> But to be honest that doesnt seem like the cleanest approach, so happy for
> advice on this
>
> Regards
> Sam
>
> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> -dev
>
> You can use withColumn to change the type after the data has been loaded
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
> .
>

Re: specifing schema on dataframe

2017-02-05 Thread Sam Elamin
Ok thanks Micheal!


Can I get an idea on where to start? Assuming I have the end schema and the
current dataframe...
How can I loop through it and create a new dataframe using the WithColumn?


Am I iterating through the dataframe or the schema?

I'm assuming it's easier to iterate through the columns in the old df. For
each column cast it correctly and generate a new df?


Would you recommend that?

Regards
Sam
On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
wrote:

> If you already have the expected schema, and you know that all numbers
> will always be formatted as strings in the input JSON, you could probably
> derive this list automatically.
>
> Wouldn't it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> I think this is likely to be a slower and less robust solution.  You would
> have to make sure that you got all the corner cases right (i.e. escaping
> and what not).
>
> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> I see so for the connector I need to pass in an array/list of numerical
> columns?
>
> Wouldnt it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> Regards
> Sam
>
> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> Specifying the schema when parsing JSON will only let you pick between
> similar datatypes (i.e should this be a short, long float, double etc).  It
> will not let you perform conversions like string <-> number.  This has to
> be done with explicit casts after the data has been loaded.
>
> I think you can make a solution that uses select or withColumn generic.
> Just load the dataframe with a "parse schema" that treats numbers as
> strings.  Then construct a list of columns that should be numbers and apply
> the necessary conversions.
>
> import org.apache.spark.sql.functions.col
> var df = spark.read.schema(parseSchema).json("...")
> numericColumns.foreach { columnName =>
>   df = df.withColumn(columnName, col(columnName).cast("long"))
> }
>
>
>
> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Thanks Micheal
>
> I've been spending the past few days researching this
>
> The problem is the generated json has double quotes on fields that are
> numbers because the producing datastore doesn't want to lose precision
>
> I can change the data type true but that would be on specific to a job
> rather than a generic streaming job. I'm writing a structured streaming
> connector and I have the schema the generated dataframe should match.
>
> Unfortunately using withColumn won't help me here since the solution needs
> to be generic
>
> To summarise assume I have the following json
>
> [{
> "customerid": "535137",
> "foo": "bar"
> }]
>
>
> and I know the schema should be:
>
> StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true)))
>
> Whats the best way of solving this?
>
> My current approach is to iterate over the JSON and identify which fields
> are numbers and which arent then recreate the json
>
> But to be honest that doesnt seem like the cleanest approach, so happy for
> advice on this
>
> Regards
> Sam
>
> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> -dev
>
> You can use withColumn to change the type after the data has been loaded
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
> .
>
> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Hi Direceu
>
> Thanks your right! that did work
>
>
> But now im facing an even bigger problem since i dont have access to
> change the underlying data, I just want to apply a schema over something
> that was written via the sparkContext.newAPIHadoopRDD
>
> Basically I am reading in a RDD[JsonObject] and would like to convert it
> into a dataframe which I pass the schema into
>
> Whats the best way to do this?
>
> I doubt removing all the quotes in the JSON is the best solution is it?
>
> Regards
> Sam
>
> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
> Hi Sam
> Remove the " from the number that it will work
>
> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
> escreveu:
>
> Hi All
>
> I would like to specify a schema when reading fro

Re: specifing schema on dataframe

2017-02-05 Thread Sam Elamin
I see so for the connector I need to pass in an array/list of numerical
columns?

Wouldnt it be simpler to just regex replace the numbers to remove the
quotes?


Regards
Sam

On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Specifying the schema when parsing JSON will only let you pick between
> similar datatypes (i.e should this be a short, long float, double etc).  It
> will not let you perform conversions like string <-> number.  This has to
> be done with explicit casts after the data has been loaded.
>
> I think you can make a solution that uses select or withColumn generic.
> Just load the dataframe with a "parse schema" that treats numbers as
> strings.  Then construct a list of columns that should be numbers and apply
> the necessary conversions.
>
> import org.apache.spark.sql.functions.col
> var df = spark.read.schema(parseSchema).json("...")
> numericColumns.foreach { columnName =>
>   df = df.withColumn(columnName, col(columnName).cast("long"))
> }
>
>
>
> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> Thanks Micheal
>>
>> I've been spending the past few days researching this
>>
>> The problem is the generated json has double quotes on fields that are
>> numbers because the producing datastore doesn't want to lose precision
>>
>> I can change the data type true but that would be on specific to a job
>> rather than a generic streaming job. I'm writing a structured streaming
>> connector and I have the schema the generated dataframe should match.
>>
>> Unfortunately using withColumn won't help me here since the solution
>> needs to be generic
>>
>> To summarise assume I have the following json
>>
>> [{
>> "customerid": "535137",
>> "foo": "bar"
>> }]
>>
>>
>> and I know the schema should be:
>> StructType(Array(StructField("customerid",LongType,true),Str
>> uctField("foo",StringType,true)))
>>
>> Whats the best way of solving this?
>>
>> My current approach is to iterate over the JSON and identify which fields
>> are numbers and which arent then recreate the json
>>
>> But to be honest that doesnt seem like the cleanest approach, so happy
>> for advice on this
>>
>> Regards
>> Sam
>>
>> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> -dev
>>>
>>> You can use withColumn to change the type after the data has been loaded
>>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
>>> .
>>>
>>> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> Hi Direceu
>>>
>>> Thanks your right! that did work
>>>
>>>
>>> But now im facing an even bigger problem since i dont have access to
>>> change the underlying data, I just want to apply a schema over something
>>> that was written via the sparkContext.newAPIHadoopRDD
>>>
>>> Basically I am reading in a RDD[JsonObject] and would like to convert it
>>> into a dataframe which I pass the schema into
>>>
>>> Whats the best way to do this?
>>>
>>> I doubt removing all the quotes in the JSON is the best solution is it?
>>>
>>> Regards
>>> Sam
>>>
>>> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
>>> dirceu.semigh...@gmail.com> wrote:
>>>
>>> Hi Sam
>>> Remove the " from the number that it will work
>>>
>>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
>>> escreveu:
>>>
>>> Hi All
>>>
>>> I would like to specify a schema when reading from a json but when
>>> trying to map a number to a Double it fails, I tried FloatType and IntType
>>> with no joy!
>>>
>>>
>>> When inferring the schema customer id is set to String, and I would like
>>> to cast it as Double
>>>
>>> so df1 is corrupted while df2 shows
>>>
>>>
>>> Also FYI I need this to be generic as I would like to apply it to any
>>> json, I specified the below schema as an example of the issue I am facing
>>>
>>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>>> DoubleType,FloatType, StructType, LongType,DecimalType}
>>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>>> val df1 = 
>>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> val df2 = 
>>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> df1.show(1)
>>> df2.show(1)
>>>
>>>
>>> Any help would be appreciated, I am sure I am missing something obvious
>>> but for the life of me I cant tell what it is!
>>>
>>>
>>> Kind Regards
>>> Sam
>>>
>>>
>>>
>>>
>


Re: specifing schema on dataframe

2017-02-05 Thread Sam Elamin
Thanks Micheal

I've been spending the past few days researching this

The problem is the generated json has double quotes on fields that are
numbers because the producing datastore doesn't want to lose precision

I can change the data type true but that would be on specific to a job
rather than a generic streaming job. I'm writing a structured streaming
connector and I have the schema the generated dataframe should match.

Unfortunately using withColumn won't help me here since the solution needs
to be generic

To summarise assume I have the following json

[{
"customerid": "535137",
"foo": "bar"
}]


and I know the schema should be:
StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true)))

Whats the best way of solving this?

My current approach is to iterate over the JSON and identify which fields
are numbers and which arent then recreate the json

But to be honest that doesnt seem like the cleanest approach, so happy for
advice on this

Regards
Sam

On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
wrote:

> -dev
>
> You can use withColumn to change the type after the data has been loaded
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
> .
>
> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Hi Direceu
>
> Thanks your right! that did work
>
>
> But now im facing an even bigger problem since i dont have access to
> change the underlying data, I just want to apply a schema over something
> that was written via the sparkContext.newAPIHadoopRDD
>
> Basically I am reading in a RDD[JsonObject] and would like to convert it
> into a dataframe which I pass the schema into
>
> Whats the best way to do this?
>
> I doubt removing all the quotes in the JSON is the best solution is it?
>
> Regards
> Sam
>
> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
> Hi Sam
> Remove the " from the number that it will work
>
> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
> escreveu:
>
> Hi All
>
> I would like to specify a schema when reading from a json but when trying
> to map a number to a Double it fails, I tried FloatType and IntType with no
> joy!
>
>
> When inferring the schema customer id is set to String, and I would like
> to cast it as Double
>
> so df1 is corrupted while df2 shows
>
>
> Also FYI I need this to be generic as I would like to apply it to any
> json, I specified the below schema as an example of the issue I am facing
>
> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
> DoubleType,FloatType, StructType, LongType,DecimalType}
> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
> val df1 = 
> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
> val df2 = 
> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
> df1.show(1)
> df2.show(1)
>
>
> Any help would be appreciated, I am sure I am missing something obvious
> but for the life of me I cant tell what it is!
>
>
> Kind Regards
> Sam
>
>
>
>


Re: specifing schema on dataframe

2017-02-04 Thread Sam Elamin
Hi Direceu

Thanks your right! that did work


But now im facing an even bigger problem since i dont have access to change
the underlying data, I just want to apply a schema over something that was
written via the sparkContext.newAPIHadoopRDD

Basically I am reading in a RDD[JsonObject] and would like to convert it
into a dataframe which I pass the schema into

Whats the best way to do this?

I doubt removing all the quotes in the JSON is the best solution is it?

Regards
Sam

On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Sam
> Remove the " from the number that it will work
>
> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
> escreveu:
>
>> Hi All
>>
>> I would like to specify a schema when reading from a json but when trying
>> to map a number to a Double it fails, I tried FloatType and IntType with no
>> joy!
>>
>>
>> When inferring the schema customer id is set to String, and I would like
>> to cast it as Double
>>
>> so df1 is corrupted while df2 shows
>>
>>
>> Also FYI I need this to be generic as I would like to apply it to any
>> json, I specified the below schema as an example of the issue I am facing
>>
>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>> DoubleType,FloatType, StructType, LongType,DecimalType}
>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>> val df1 = 
>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> val df2 = 
>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> df1.show(1)
>> df2.show(1)
>>
>>
>> Any help would be appreciated, I am sure I am missing something obvious
>> but for the life of me I cant tell what it is!
>>
>>
>> Kind Regards
>> Sam
>>
>


specifing schema on dataframe

2017-02-04 Thread Sam Elamin
Hi All

I would like to specify a schema when reading from a json but when trying
to map a number to a Double it fails, I tried FloatType and IntType with no
joy!


When inferring the schema customer id is set to String, and I would like to
cast it as Double

so df1 is corrupted while df2 shows


Also FYI I need this to be generic as I would like to apply it to any json,
I specified the below schema as an example of the issue I am facing

import org.apache.spark.sql.types.{BinaryType, StringType,
StructField, DoubleType,FloatType, StructType, LongType,DecimalType}
val testSchema = StructType(Array(StructField("customerid",DoubleType)))
val df1 = 
spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
val df2 = spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
df1.show(1)
df2.show(1)


Any help would be appreciated, I am sure I am missing something obvious but
for the life of me I cant tell what it is!


Kind Regards
Sam


Re: java.lang.NoSuchMethodError: scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef

2017-02-04 Thread Sam Elamin
Hi sathyanarayanan


zero() on scala.runtime.VolatileObjectRef has been introduced in Scala 2.11
You probably have a library compiled against Scala 2.11 and running on a
Scala 2.10 runtime.

See

v2.10:
https://github.com/scala/scala/blob/2.10.x/src/library/scala/runtime/VolatileObjectRef.java
v2.11:
https://github.com/scala/scala/blob/2.11.x/src/library/scala/runtime/VolatileObjectRef.java

Regards
Sam

On Sat, 4 Feb 2017 at 09:24, sathyanarayanan mudhaliyar <
sathyanarayananmudhali...@gmail.com> wrote:

> Hi ,
> I got the error below when executed
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef;
>
> error in detail:
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef;
> at
> com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
> at
> com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
> at
> com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
> at
> com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:82)
> at com.nwf.Consumer.main(Consumer.java:63)
>
> code :
>
> Consumer consumer = new Consumer();
> SparkConf conf = new
> SparkConf().setAppName("kafka-sandbox").setMaster("local[2]");
> conf.set("spark.cassandra.connection.host", "localhost"); //connection
> for cassandra database
> JavaSparkContext sc = new JavaSparkContext(conf);
> CassandraConnector connector = CassandraConnector.apply(sc.getConf());
> final Session session = connector.openSession();
> final PreparedStatement prepared = session.prepare("INSERT INTO
> spark_test5.messages JSON?");
>
>
> The error is in the line which is in green color.
> Thank you guys.
>
>


Upgrading to Spark 2.0.1 broke array in parquet DataFrame

2016-11-04 Thread Sam Goodwin
I have a table with a few columns, some of which are arrays. Since
upgrading from Spark 1.6 to Spark 2.0.1, the array fields are always null
when reading in a DataFrame.

When writing the Parquet files, the schema of the column is specified as

StructField("packageIds",ArrayType(StringType))

The schema of the column in the Hive Metastore is

packageIds array

The schema used in the writer exactly matches the schema in the Metastore
in all ways (order, casing, types etc)

The query is a simple "select *"

spark.sql("select * from tablename limit 1").collect() // null columns in Row

How can I begin debugging this issue? Notable things I've already
investigated:

   - Files were written using Spark 1.6
   - DataFrame works in spark 1.5 and 1.6
   - I've inspected the parquet files using parquet-tools and can see the
   data.
   - I also have another table written in exactly the same way and it
   doesn't have the issue.


Re: Spark join and large temp files

2016-08-09 Thread Sam Bessalah
Have you tried to broadcast your small table table in order to perform your
join ?

joined = bigDF.join(broadcast(smallDF, )


On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab  wrote:

> Hi Deepak,
> No...not really. Upping the disk size is a solution, but more expensive as
> you can't attach EBS volumes to EMR clusters configured with data pipelines
> easily (which is what we're doing). I've tried collecting the 1.5G dataset
> in a hashmap, and broadcasting. Timeouts seems to prevent that (even after
> upping the max driver result size). Increasing partition counts didn't help
> (the shuffle used up the temp space). I'm now looking at some form of
> clever broadcasting, or maybe falling back to chunking up the input,
> producing interim output, and unioning them for the final output. Might
> even try using Spark Streaming pointing to the parquet and seeing if that
> helps.
>
> -Ashic.
>
> --
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 17:31:19 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
>
> Hi Ashic
> Did you find the resolution to this issue?
> Just curious to know like what helped in this scenario.
>
> Thanks
> Deepak
>
>
> On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab  wrote:
>
> Hi Deepak,
> Thanks for the response.
>
> Registering the temp tables didn't help. Here's what I have:
>
> val a = sqlContext..read.parquet(...).select("eid.id",
> "name").withColumnRenamed("eid.id", "id")
> val b = sqlContext.read.parquet(...).select("id", "number")
>
> a.registerTempTable("a")
> b.registerTempTable("b")
>
> val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join
> b y on x.id=y.id)
>
> results.write.parquet(...)
>
> Is there something I'm missing?
>
> Cheers,
> Ashic.
>
> --
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 00:01:32 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
> CC: user@spark.apache.org
>
>
> Register you dataframes as temp tables and then try the join on the temp
> table.
> This should resolve your issue.
>
> Thanks
> Deepak
>
> On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab  wrote:
>
> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: hdfs-ha on mesos - odd bug

2015-09-14 Thread Sam Bessalah
I don't know about the broken url. But are you running HDFS as a mesos
framework? If so is it using mesos-dns?
Then you should resolve the namenode via hdfs:/// 

On Mon, Sep 14, 2015 at 3:55 PM, Adrian Bridgett 
wrote:

> I'm hitting an odd issue with running spark on mesos together with
> HA-HDFS, with an even odder workaround.
>
> In particular I get an error that it can't find the HDFS nameservice
> unless I put in a _broken_ url (discovered that workaround by mistake!).
> core-site.xml, hdfs-site.xml is distributed to the slave node - and that
> file is read since I deliberately break the file then I get an error as
> you'd expect.
>
> NB: This is a bit different to
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201402.mbox/%3c1392442185079-1549.p...@n3.nabble.com%3E
>
>
> Spark 1.5.0:
>
> t=sc.textFile("hdfs://nameservice1/tmp/issue")
> t.count()
> (fails)
>
> t=sc.textFile("file://etc/passwd")
> t.count()
> (errors about bad url - should have an extra / of course)
> t=sc.textFile("hdfs://nameservice1/tmp/issue")
> t.count()
> then it works!!!
>
> I should say that using file:///etc/passwd or hdfs:///tmp/issue both fail
> as well.  Unless preceded by a broken url.I've tried setting
> spark.hadoop.cloneConf to true, no change.
>
> Sample (broken) run:
> 15/09/14 13:00:14 DEBUG HadoopRDD: Creating new JobConf and caching it for
> later re-use
> 15/09/14 13:00:14 DEBUG : address: ip-10-1-200-165/10.1.200.165
> isLoopbackAddress: false, with host 10.1.200.165 ip-10-1-200-165
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.use.legacy.blockreader.local = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit =
> false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.domain.socket.data.traffic = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path =
> /var/run/hdfs-sockets/dn
> 15/09/14 13:00:14 DEBUG HAUtil: No HA service delegation token found for
> logical URI hdfs://nameservice1
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.use.legacy.blockreader.local = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit =
> false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.domain.socket.data.traffic = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path =
> /var/run/hdfs-sockets/dn
> 15/09/14 13:00:14 DEBUG RetryUtils: multipleLinearRandomRetry = null
> 15/09/14 13:00:14 DEBUG Server: rpcKind=RPC_PROTOCOL_BUFFER,
> rpcRequestWrapperClass=class
> org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper,
> rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@6245f50b
> 15/09/14 13:00:14 DEBUG Client: getting client out of cache:
> org.apache.hadoop.ipc.Client@267f0fd3
> 15/09/14 13:00:14 DEBUG NativeCodeLoader: Trying to load the custom-built
> native-hadoop library...
> 15/09/14 13:00:14 DEBUG NativeCodeLoader: Loaded the native-hadoop library
> ...
> 15/09/14 13:00:14 DEBUG Client: Connecting to
> mesos-1.example.com/10.1.200.165:8020
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: starting, having
> connections 1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #0
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #0
> 15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getFileInfo took 36ms
> 15/09/14 13:00:14 DEBUG FileInputFormat: Time taken to get FileStatuses: 69
> 15/09/14 13:00:14 INFO FileInputFormat: Total input paths to process : 1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #1
> 15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 1ms
> 15/09/14 13:00:14 DEBUG FileInputFormat: Total # of splits generated by
> getSplits: 2, TimeTaken: 104
> ...
> 15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: closed
> 15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: stopped, remaining
> connections 0
> 15/09/14 13:00:24 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
> message
> AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true)
> from Actor[akka://sparkDriver/temp/$g]
> 15/09/14 13:00:24 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message:
> AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true)
> 15/09/14 13:00:24 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
> message 

Re: *Metrics API is odd in MLLib

2015-07-28 Thread Sam
Hi Xiangrui  Spark People,

I recently got round to writing an evaluation framework for Spark that I
was hoping to PR into MLLib and this would solve some of the aforementioned
issues.  I have put the code on github in a separate repo for now as I
would like to get some sandboxed feedback.  The repo complete with detailed
documentation can be found here https://github.com/samthebest/sceval.

Many thanks,

Sam



On Thu, Jun 18, 2015 at 11:00 AM, Sam samthesav...@gmail.com wrote:

 Firstly apologies for the header of my email containing some junk, I
 believe it's due to a copy and paste error on a smart phone.

 Thanks for your response.  I will indeed make the PR you suggest, though
 glancing at the code I realize it's not just a case of making these public
 since the types are also private. Then, there is certain functionality I
 will be exposing, which then ought to be tested, e.g. every bin except
 potentially the last will have an equal number of data points in it*.  I'll
 get round to it at some point.

 As for BinaryClassificationMetrics using Double for labels, thanks for the
 explanation.  If I where to make a PR to encapsulate the underlying
 implementation (that uses LabeledPoint) and change the type to Boolean,
 would what be the impact to versioning (since I'd be changing public API)?
 An alternative would be to create a new wrapper class, say
 BinaryClassificationMeasures, and deprecate the old with the intention of
 migrating all the code into the new class.

 * Maybe some other part of the code base tests this, since this assumption
 must hold in order to average across folds in x-validation?

 On Thu, Jun 18, 2015 at 1:02 AM, Xiangrui Meng men...@gmail.com wrote:

 LabeledPoint was used for both classification and regression, where label
 type is Double for simplicity. So in BinaryClassificationMetrics, we still
 use Double for labels. We compute the confusion matrix at each threshold
 internally, but this is not exposed to users (
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala#L127).
 Feel free to submit a PR to make it public. -Xiangrui

 On Mon, Jun 15, 2015 at 7:13 AM, Sam samthesav...@gmail.com wrote:


 Google+
 https://plus.google.com/app/basic?nopromo=1source=moggl=uk
 http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk
 Calendar
 https://www.google.com/calendar/gpcal?source=moggl=uk
 Web
 http://www.google.co.uk/?source=moggl=uk
 more
 Inbox
 Apache Spark Email
 GmailNot Work
 S
 sam.sav...@barclays.com
 to me
 0 minutes ago
 Details
 According to
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

 The constructor takes `RDD[(Double, Double)]` meaning lables are
 Doubles, this seems odd, shouldn't it be Boolean?  Similarly for
 MutlilabelMetrics (I.e. Should be RDD[(Array[Double], Array[Boolean])]),
 and for MulticlassMetrics the type of both should be generic?

 Additionally it would be good if either the ROC output type was changed
 or another method was added that returned confusion matricies, so that the
 hard integer values can be obtained before the divisions. E.g.

 ```
 case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int)
 {
   // bunch of methods for each of the things in the table here
 https://en.wikipedia.org/wiki/Receiver_operating_characteristic
 }
 ...
 def confusions(): RDD[Confusion]
 ```






Re: *Metrics API is odd in MLLib

2015-06-18 Thread Sam
Firstly apologies for the header of my email containing some junk, I
believe it's due to a copy and paste error on a smart phone.

Thanks for your response.  I will indeed make the PR you suggest, though
glancing at the code I realize it's not just a case of making these public
since the types are also private. Then, there is certain functionality I
will be exposing, which then ought to be tested, e.g. every bin except
potentially the last will have an equal number of data points in it*.  I'll
get round to it at some point.

As for BinaryClassificationMetrics using Double for labels, thanks for the
explanation.  If I where to make a PR to encapsulate the underlying
implementation (that uses LabeledPoint) and change the type to Boolean,
would what be the impact to versioning (since I'd be changing public API)?
An alternative would be to create a new wrapper class, say
BinaryClassificationMeasures, and deprecate the old with the intention of
migrating all the code into the new class.

* Maybe some other part of the code base tests this, since this assumption
must hold in order to average across folds in x-validation?

On Thu, Jun 18, 2015 at 1:02 AM, Xiangrui Meng men...@gmail.com wrote:

 LabeledPoint was used for both classification and regression, where label
 type is Double for simplicity. So in BinaryClassificationMetrics, we still
 use Double for labels. We compute the confusion matrix at each threshold
 internally, but this is not exposed to users (
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala#L127).
 Feel free to submit a PR to make it public. -Xiangrui

 On Mon, Jun 15, 2015 at 7:13 AM, Sam samthesav...@gmail.com wrote:


 Google+
 https://plus.google.com/app/basic?nopromo=1source=moggl=uk
 http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk
 Calendar
 https://www.google.com/calendar/gpcal?source=moggl=uk
 Web
 http://www.google.co.uk/?source=moggl=uk
 more
 Inbox
 Apache Spark Email
 GmailNot Work
 S
 sam.sav...@barclays.com
 to me
 0 minutes ago
 Details
 According to
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

 The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles,
 this seems odd, shouldn't it be Boolean?  Similarly for MutlilabelMetrics
 (I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for
 MulticlassMetrics the type of both should be generic?

 Additionally it would be good if either the ROC output type was changed
 or another method was added that returned confusion matricies, so that the
 hard integer values can be obtained before the divisions. E.g.

 ```
 case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int)
 {
   // bunch of methods for each of the things in the table here
 https://en.wikipedia.org/wiki/Receiver_operating_characteristic
 }
 ...
 def confusions(): RDD[Confusion]
 ```





*Metrics API is odd in MLLib

2015-06-15 Thread Sam
Google+
https://plus.google.com/app/basic?nopromo=1source=moggl=uk
http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk
Calendar
https://www.google.com/calendar/gpcal?source=moggl=uk
Web
http://www.google.co.uk/?source=moggl=uk
more
Inbox
Apache Spark Email
GmailNot Work
S
sam.sav...@barclays.com
to me
0 minutes ago
Details
According to
https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles,
this seems odd, shouldn't it be Boolean?  Similarly for MutlilabelMetrics
(I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for
MulticlassMetrics the type of both should be generic?

Additionally it would be good if either the ROC output type was changed or
another method was added that returned confusion matricies, so that the
hard integer values can be obtained before the divisions. E.g.

```
case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int)
{
  // bunch of methods for each of the things in the table here
https://en.wikipedia.org/wiki/Receiver_operating_characteristic
}
...
def confusions(): RDD[Confusion]
```


Spark Python with SequenceFile containing numpy deserialized data in str form

2015-06-08 Thread Sam Stoelinga
Hi all,

I'm storing an rdd as sequencefile with the following content:
key=filename(string) value=python str from numpy.savez(not unicode)

In order to make sure the whole numpy array get's stored I have to first
serialize it with:
def serialize_numpy_array(numpy_array):
output = io.BytesIO()
np.savez_compressed(output, x=numpy_array)
return output.getvalue()

 type(output.getvalue())
str

The deserialization returns a python str, *not unicode object*. After
deserialization I call

my_dersialized_numpy_rdd.saveAsSequenceFile(path)

all works well and the RDD get stored successfully. Now the problem starts
I want to read the sequencefile again:

 my_dersialized_numpy_rdd = sc.sequenceFile(path)
 first = my_dersialized_numpy_rdd.first()
 type(first[1])
unicode

The previous str became a unicode object after we stored it to a
sequencefile and read it again. Trying to convert it back with
first[1].decode(ascii) fails with UnicodeEncodeError: 'ascii' codec can't
encode characters in position 1-3: ordinal not in range(128)

My expectation was that I would get the data back as how I stored it for
example in str format and not in unicode format. Anybody suggestion how I
can read back the original data. Will try converting the str to bytearray
before storing it to a seqeencefile.

Thanks,
Sam Stoelinga


Re: Spark Python with SequenceFile containing numpy deserialized data in str form

2015-06-08 Thread Sam Stoelinga
Update: Using bytearray before storing to RDD is not a solution either.
This happens when trying to read the RDD when the value was stored as
python bytearray:

Traceback (most recent call last):

[0/9120]
  File /vagrant/python/kmeans.py, line 24, in module
features = sc.sequenceFile(feature_sequencefile_path)
  File /usr/local/spark/python/pyspark/context.py, line 490, in
sequenceFile
keyConverter, valueConverter, minSplits, batchSize)
  File
/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__
  File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.sequenceFile.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not
exist: hdfs://localhost:9000/tmp/feature-bytearray
at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at
org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.RDD.take(RDD.scala:1156)
at
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:205)
at
org.apache.spark.api.python.PythonRDD$.sequenceFile(PythonRDD.scala:447)
at
org.apache.spark.api.python.PythonRDD.sequenceFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)


On Tue, Jun 9, 2015 at 11:04 AM, Sam Stoelinga sammiest...@gmail.com
wrote:

 Hi all,

 I'm storing an rdd as sequencefile with the following content:
 key=filename(string) value=python str from numpy.savez(not unicode)

 In order to make sure the whole numpy array get's stored I have to first
 serialize it with:
 def serialize_numpy_array(numpy_array):
 output = io.BytesIO()
 np.savez_compressed(output, x=numpy_array)
 return output.getvalue()

  type(output.getvalue())
 str

 The deserialization returns a python str, *not unicode object*. After
 deserialization I call

 my_dersialized_numpy_rdd.saveAsSequenceFile(path)

 all works well and the RDD get stored successfully. Now the problem starts
 I want to read the sequencefile again:

  my_dersialized_numpy_rdd = sc.sequenceFile(path)
  first = my_dersialized_numpy_rdd.first()
  type(first[1])
 unicode

 The previous str became a unicode object after we stored it to a
 sequencefile and read it again. Trying to convert it back with
 first[1].decode(ascii) fails with UnicodeEncodeError: 'ascii' codec can't
 encode characters in position 1-3: ordinal not in range(128)

 My expectation was that I would get the data back as how I stored it for
 example in str format and not in unicode format. Anybody suggestion how I
 can read back the original data. Will try converting the str to bytearray
 before storing it to a seqeencefile.

 Thanks,
 Sam Stoelinga




Re: Spark Python with SequenceFile containing numpy deserialized data in str form

2015-06-08 Thread Sam Stoelinga
Update: I've done a workaround to use saveAsPickleFile instead which
handles everything correctly. It stays in byte format. Noticed python got
messy with str and byte being the same in Python 2.7, wondering whether
using Python 3 would have the same problem.

I would still like to use a cross language usable SequenceFile instead of
using Picklefile though, so if anybody has pointers would appreciate that :)

On Tue, Jun 9, 2015 at 11:35 AM, Sam Stoelinga sammiest...@gmail.com
wrote:

 Update: Using bytearray before storing to RDD is not a solution either.
 This happens when trying to read the RDD when the value was stored as
 python bytearray:

 Traceback (most recent call last):

 [0/9120]
   File /vagrant/python/kmeans.py, line 24, in module
 features = sc.sequenceFile(feature_sequencefile_path)
   File /usr/local/spark/python/pyspark/context.py, line 490, in
 sequenceFile
 keyConverter, valueConverter, minSplits, batchSize)
   File
 /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File
 /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.sequenceFile.
 : org.apache.hadoop.mapred.InvalidInputException: Input path does not
 exist: hdfs://localhost:9000/tmp/feature-bytearray
 at
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
 at
 org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45)
 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
 at
 org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at org.apache.spark.rdd.RDD.take(RDD.scala:1156)
 at
 org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:205)
 at
 org.apache.spark.api.python.PythonRDD$.sequenceFile(PythonRDD.scala:447)
 at
 org.apache.spark.api.python.PythonRDD.sequenceFile(PythonRDD.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)


 On Tue, Jun 9, 2015 at 11:04 AM, Sam Stoelinga sammiest...@gmail.com
 wrote:

 Hi all,

 I'm storing an rdd as sequencefile with the following content:
 key=filename(string) value=python str from numpy.savez(not unicode)

 In order to make sure the whole numpy array get's stored I have to first
 serialize it with:
 def serialize_numpy_array(numpy_array):
 output = io.BytesIO()
 np.savez_compressed(output, x=numpy_array)
 return output.getvalue()

  type(output.getvalue())
 str

 The deserialization returns a python str, *not unicode object*. After
 deserialization I call

 my_dersialized_numpy_rdd.saveAsSequenceFile(path)

 all works well and the RDD get stored successfully. Now the problem
 starts I want to read the sequencefile again:

  my_dersialized_numpy_rdd = sc.sequenceFile(path)
  first = my_dersialized_numpy_rdd.first()
  type(first[1])
 unicode

 The previous str became a unicode object after we stored it to a
 sequencefile and read it again. Trying to convert it back with
 first[1].decode(ascii) fails with UnicodeEncodeError: 'ascii' codec can't
 encode characters in position 1-3: ordinal not in range(128)

 My expectation was that I would get the data back as how I stored it for
 example in str format and not in unicode format. Anybody suggestion how I
 can read back the original data. Will try converting the str

Re: PySpark with OpenCV causes python worker to crash

2015-06-05 Thread Sam Stoelinga
I've changed the SIFT feature extraction to SURF feature extraction and it
works...

Following line was changed:
sift = cv2.xfeatures2d.SIFT_create()

to

sift = cv2.xfeatures2d.SURF_create()

Where should I file this as a bug? When not running on Spark it works fine
so I'm saying it's a spark bug.

On Fri, Jun 5, 2015 at 2:17 PM, Sam Stoelinga sammiest...@gmail.com wrote:

 Yea should have emphasized that. I'm running the same code on the same VM.
 It's a VM with spark in standalone mode and I run the unit test directly on
 that same VM. So OpenCV is working correctly on that same machine but when
 moving the exact same OpenCV code to spark it just crashes.

 On Tue, Jun 2, 2015 at 5:06 AM, Davies Liu dav...@databricks.com wrote:

 Could you run the single thread version in worker machine to make sure
 that OpenCV is installed and configured correctly?

 On Sat, May 30, 2015 at 6:29 AM, Sam Stoelinga sammiest...@gmail.com
 wrote:
  I've verified the issue lies within Spark running OpenCV code and not
 within
  the sequence file BytesWritable formatting.
 
  This is the code which can reproduce that spark is causing the failure
 by
  not using the sequencefile as input at all but running the same function
  with same input on spark but fails:
 
  def extract_sift_features_opencv(imgfile_imgbytes):
  imgfilename, discardsequencefile = imgfile_imgbytes
  imgbytes = bytearray(open(/tmp/img.jpg, rb).read())
  nparr = np.fromstring(buffer(imgbytes), np.uint8)
  img = cv2.imdecode(nparr, 1)
  gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
  sift = cv2.xfeatures2d.SIFT_create()
  kp, descriptors = sift.detectAndCompute(gray, None)
  return (imgfilename, test)
 
  And corresponding tests.py:
  https://gist.github.com/samos123/d383c26f6d47d34d32d6
 
 
  On Sat, May 30, 2015 at 8:04 PM, Sam Stoelinga sammiest...@gmail.com
  wrote:
 
  Thanks for the advice! The following line causes spark to crash:
 
  kp, descriptors = sift.detectAndCompute(gray, None)
 
  But I do need this line to be executed and the code does not crash when
  running outside of Spark but passing the same parameters. You're saying
  maybe the bytes from the sequencefile got somehow transformed and don't
  represent an image anymore causing OpenCV to crash the whole python
  executor.
 
  On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com
 wrote:
 
  Could you try to comment out some lines in
  `extract_sift_features_opencv` to find which line cause the crash?
 
  If the bytes came from sequenceFile() is broken, it's easy to crash a
  C library in Python (OpenCV).
 
  On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com
 
  wrote:
   Hi sparkers,
  
   I am working on a PySpark application which uses the OpenCV
 library. It
   runs
   fine when running the code locally but when I try to run it on
 Spark on
   the
   same Machine it crashes the worker.
  
   The code can be found here:
   https://gist.github.com/samos123/885f9fe87c8fa5abf78f
  
   This is the error message taken from STDERR of the worker log:
   https://gist.github.com/samos123/3300191684aee7fc8013
  
   Would like pointers or tips on how to debug further? Would be nice
 to
   know
   the reason why the worker crashed.
  
   Thanks,
   Sam Stoelinga
  
  
   org.apache.spark.SparkException: Python worker exited unexpectedly
   (crashed)
   at
  
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172)
   at
  
  
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at
  
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
   Caused by: java.io.EOFException
   at java.io.DataInputStream.readInt(DataInputStream.java:392)
   at
  
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)
  
  
  
 
 
 





Re: PySpark with OpenCV causes python worker to crash

2015-06-05 Thread Sam Stoelinga
Yea should have emphasized that. I'm running the same code on the same VM.
It's a VM with spark in standalone mode and I run the unit test directly on
that same VM. So OpenCV is working correctly on that same machine but when
moving the exact same OpenCV code to spark it just crashes.

On Tue, Jun 2, 2015 at 5:06 AM, Davies Liu dav...@databricks.com wrote:

 Could you run the single thread version in worker machine to make sure
 that OpenCV is installed and configured correctly?

 On Sat, May 30, 2015 at 6:29 AM, Sam Stoelinga sammiest...@gmail.com
 wrote:
  I've verified the issue lies within Spark running OpenCV code and not
 within
  the sequence file BytesWritable formatting.
 
  This is the code which can reproduce that spark is causing the failure by
  not using the sequencefile as input at all but running the same function
  with same input on spark but fails:
 
  def extract_sift_features_opencv(imgfile_imgbytes):
  imgfilename, discardsequencefile = imgfile_imgbytes
  imgbytes = bytearray(open(/tmp/img.jpg, rb).read())
  nparr = np.fromstring(buffer(imgbytes), np.uint8)
  img = cv2.imdecode(nparr, 1)
  gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
  sift = cv2.xfeatures2d.SIFT_create()
  kp, descriptors = sift.detectAndCompute(gray, None)
  return (imgfilename, test)
 
  And corresponding tests.py:
  https://gist.github.com/samos123/d383c26f6d47d34d32d6
 
 
  On Sat, May 30, 2015 at 8:04 PM, Sam Stoelinga sammiest...@gmail.com
  wrote:
 
  Thanks for the advice! The following line causes spark to crash:
 
  kp, descriptors = sift.detectAndCompute(gray, None)
 
  But I do need this line to be executed and the code does not crash when
  running outside of Spark but passing the same parameters. You're saying
  maybe the bytes from the sequencefile got somehow transformed and don't
  represent an image anymore causing OpenCV to crash the whole python
  executor.
 
  On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com
 wrote:
 
  Could you try to comment out some lines in
  `extract_sift_features_opencv` to find which line cause the crash?
 
  If the bytes came from sequenceFile() is broken, it's easy to crash a
  C library in Python (OpenCV).
 
  On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com
  wrote:
   Hi sparkers,
  
   I am working on a PySpark application which uses the OpenCV library.
 It
   runs
   fine when running the code locally but when I try to run it on Spark
 on
   the
   same Machine it crashes the worker.
  
   The code can be found here:
   https://gist.github.com/samos123/885f9fe87c8fa5abf78f
  
   This is the error message taken from STDERR of the worker log:
   https://gist.github.com/samos123/3300191684aee7fc8013
  
   Would like pointers or tips on how to debug further? Would be nice to
   know
   the reason why the worker crashed.
  
   Thanks,
   Sam Stoelinga
  
  
   org.apache.spark.SparkException: Python worker exited unexpectedly
   (crashed)
   at
  
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172)
   at
  
  
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at
   org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
   Caused by: java.io.EOFException
   at java.io.DataInputStream.readInt(DataInputStream.java:392)
   at
  
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)
  
  
  
 
 
 



Re: PySpark with OpenCV causes python worker to crash

2015-06-05 Thread Sam Stoelinga
Thanks Davies. I will file a bug later with code and single image as
dataset. Next to that I can give anybody access to my vagrant VM that
already has spark with OpenCV and the dataset available.

Or you can setup the same vagrant machine at your place. All is automated ^^
git clone https://github.com/samos123/computer-vision-cloud-platform
cd computer-vision-cloud-platform
./scripts/setup.sh
vagrant ssh

(Expect failures, I haven't cleaned up and tested it for other people) btw
I study at Tsinghua also currently.

On Fri, Jun 5, 2015 at 2:43 PM, Davies Liu dav...@databricks.com wrote:

 Please file a bug here: https://issues.apache.org/jira/browse/SPARK/

 Could you also provide a way to reproduce this bug (including some
 datasets)?

 On Thu, Jun 4, 2015 at 11:30 PM, Sam Stoelinga sammiest...@gmail.com
 wrote:
  I've changed the SIFT feature extraction to SURF feature extraction and
 it
  works...
 
  Following line was changed:
  sift = cv2.xfeatures2d.SIFT_create()
 
  to
 
  sift = cv2.xfeatures2d.SURF_create()
 
  Where should I file this as a bug? When not running on Spark it works
 fine
  so I'm saying it's a spark bug.
 
  On Fri, Jun 5, 2015 at 2:17 PM, Sam Stoelinga sammiest...@gmail.com
 wrote:
 
  Yea should have emphasized that. I'm running the same code on the same
 VM.
  It's a VM with spark in standalone mode and I run the unit test
 directly on
  that same VM. So OpenCV is working correctly on that same machine but
 when
  moving the exact same OpenCV code to spark it just crashes.
 
  On Tue, Jun 2, 2015 at 5:06 AM, Davies Liu dav...@databricks.com
 wrote:
 
  Could you run the single thread version in worker machine to make sure
  that OpenCV is installed and configured correctly?
 
  On Sat, May 30, 2015 at 6:29 AM, Sam Stoelinga sammiest...@gmail.com
  wrote:
   I've verified the issue lies within Spark running OpenCV code and not
   within
   the sequence file BytesWritable formatting.
  
   This is the code which can reproduce that spark is causing the
 failure
   by
   not using the sequencefile as input at all but running the same
   function
   with same input on spark but fails:
  
   def extract_sift_features_opencv(imgfile_imgbytes):
   imgfilename, discardsequencefile = imgfile_imgbytes
   imgbytes = bytearray(open(/tmp/img.jpg, rb).read())
   nparr = np.fromstring(buffer(imgbytes), np.uint8)
   img = cv2.imdecode(nparr, 1)
   gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
   sift = cv2.xfeatures2d.SIFT_create()
   kp, descriptors = sift.detectAndCompute(gray, None)
   return (imgfilename, test)
  
   And corresponding tests.py:
   https://gist.github.com/samos123/d383c26f6d47d34d32d6
  
  
   On Sat, May 30, 2015 at 8:04 PM, Sam Stoelinga 
 sammiest...@gmail.com
   wrote:
  
   Thanks for the advice! The following line causes spark to crash:
  
   kp, descriptors = sift.detectAndCompute(gray, None)
  
   But I do need this line to be executed and the code does not crash
   when
   running outside of Spark but passing the same parameters. You're
   saying
   maybe the bytes from the sequencefile got somehow transformed and
   don't
   represent an image anymore causing OpenCV to crash the whole python
   executor.
  
   On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com
   wrote:
  
   Could you try to comment out some lines in
   `extract_sift_features_opencv` to find which line cause the crash?
  
   If the bytes came from sequenceFile() is broken, it's easy to
 crash a
   C library in Python (OpenCV).
  
   On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga
   sammiest...@gmail.com
   wrote:
Hi sparkers,
   
I am working on a PySpark application which uses the OpenCV
library. It
runs
fine when running the code locally but when I try to run it on
Spark on
the
same Machine it crashes the worker.
   
The code can be found here:
https://gist.github.com/samos123/885f9fe87c8fa5abf78f
   
This is the error message taken from STDERR of the worker log:
https://gist.github.com/samos123/3300191684aee7fc8013
   
Would like pointers or tips on how to debug further? Would be
 nice
to
know
the reason why the worker crashed.
   
Thanks,
Sam Stoelinga
   
   
org.apache.spark.SparkException: Python worker exited
 unexpectedly
(crashed)
at
   
   
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172)
at
   
   
   
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
at
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
   
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
   
   
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203

  1   2   >