Re: spark persistence doubt

2016-09-29 Thread Bedrytski Aliaksandr
Hi,

the 4th step should contain "transformrdd2", right?

considering that transformations are lined-up and executed only when
there is an action (also known as lazy execution), I would say that
adding persist() to the step 1 would not do any good (and may even be
harmful as you may lose the optimisations given by lining up the 3 steps
in one operation).

If there is a second action executed on any of the transformation,
persisting the farthest common transformation would be a good idea.

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Thu, Sep 29, 2016, at 07:09, Shushant Arora wrote:
> Hi
>
> I have a flow like below
>
> 1.rdd1=some source.transform();
> 2.tranformedrdd1 = rdd1.transform(..);
> 3.transformrdd2 = rdd1.transform(..);
>
> 4.tranformrdd1.action();
>
> Does I need to persist rdd1 to optimise step 2 and 3 ? or since there
> is no lineage breakage so it will work without persist ?
>
> Thanks
>


Re: Issue with rogue data in csv file used in Spark application

2016-09-28 Thread Bedrytski Aliaksandr
Hi Mich,

if I understood you well, you may cast the value to float, it will yield
null if the value is not a correct float:

val df = Seq(("-", 5), ("1", 6), (",", 7), ("8.6", 7)).toDF("value",
"id").createOrReplaceTempView("lines")

spark.sql("SELECT cast(value as FLOAT) from lines").show()

+-+
|value|
+-+
| null|
|  1. |
| null|
|  8.6 |
+-+

After it you may filter the DataFrame for values containing null.

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Wed, Sep 28, 2016, at 10:11, Mich Talebzadeh wrote:
> Thanks all.
>
> This is the csv schema all columns mapped to String
>
> scala> df2.printSchema
> root
>  |-- Stock: string (nullable = true) -- Ticker: string (nullable =
>  |true) -- TradeDate: string (nullable = true) -- Open: string
>  |(nullable = true) -- High: string (nullable = true) -- Low: string
>  |(nullable = true) -- Close: string (nullable = true) -- Volume:
>  |string (nullable = true)
>
> The issue I have can be shown as below
>
> df2.filter( $"OPen" === "-
> ").select((changeToDate("TradeDate").as("TradeDate")), 'Open, 'High,
> 'Low, 'Close, 'Volume).show
>
> +--+++---+-+--+
> | TradeDate|Open|High|Low|Close|Volume|
> +--+++---+-+--+
> |2011-12-23|   -|   -|  -|40.56| 0| 2011-04-21|   -|   -|  -
> ||45.85| 0| 2010-12-30|   -|   -|  -|38.10| 0| 2010-12-23|
> |-|   -|  -|38.36| 0| 2008-04-30|   -|   -|  -|32.39| 0| 2008-04-
> |29|   -|   -|  -|33.05| 0| 2008-04-28|   -|   -|  -|32.60| 0|
> +--+++---+-+--+
> Now there are ways of dealing with this. However, the solution has to
> be generic! Checking for a column == "-" is not generic. How about if
> that column was "," etc.
>
> This is an issue in most databases. Specifically if a field is NaN..
> --> (*NaN*, standing for not a number, is a numeric data type value
> representing an undefined or unrepresentable value, especially in floating-
> point calculations)
>
> Spark handles this[1]. I am on  Spark 2.0.1  in Class
> DataFrameNaFunctions. The simplest one is to drop these rogue rows
> df2.filter( $"Open" === "-").drop()
> However, a better approach would be to use REPLACE method or testing
> any column for NaN
>
>
>
>
> There is a method called isnan(). However, it does not return
> correct values!
>
>  df2.filter(isnan($"Open")).show 
> +-+--+-+++---+-+--
>  + |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume| 
> +-+--+-+++---+-+--
>  + +-+--+-+++---+-+--+
>
>
> Any suggestions?
>
> Thanks
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which
> may arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary
> damages arising from such loss, damage or destruction.
>
>
>
>
> On 28 September 2016 at 04:07, Mike Metzger
>  wrote:
>> Hi Mich -
>>
>>Can you run a filter command on df1 prior to your map for any rows
>>where p(3).toString != '-' then run your map command?
>>
>> Thanks
>>
>>
>> Mike
>>
>>
>> On Tue, Sep 27, 2016 at 5:06 PM, Mich Talebzadeh
>>  wrote:
>>> Thanks guys
>>>
>>> Actually these are the 7 rogue rows. The column 0 is the Volume
>>> column  which means there was no trades on those days
>>>
>>> *cat stock.csv|grep ",0"
*SAP SE,SAP, 23-Dec-11,-,-,-,40.56,0
>>> SAP SE,SAP, 21-Apr-11,-,-,-,45.85,0 SAP SE,SAP, 30-Dec-10,-,-,-
>>> ,38.10,0 SAP SE,SAP, 23-Dec-10,-,-,-,38.36,0 SAP SE,SAP, 30-Apr-08,-,-,-
>>> ,32.39,0 SAP SE,SAP, 29-Apr-08,-,-,-,33.05,0 SAP SE,SAP, 28-Apr-08,-,-,-
>>> ,32.60,0
>>>
>>> So one way would be to exclude the rows that there was no volume of
>>> trade that day when cleaning up the csv file
>>>
>>> *cat stock.csv|grep -v ",0"*
>>>
>>> and that works. Bearing in mind that putting 0s in place of "-" will
>>> skew the price plot.
>>>
>>> BTW I am using Spark csv as well
>>>
>

Re: how to find NaN values of each row of spark dataframe to decide whether the rows is dropeed or not

2016-09-26 Thread Bedrytski Aliaksandr
Hi Muhammet,

python also supports sql queries
http://spark.apache.org/docs/latest/sql-programming-guide.html#running-sql-queries-programmatically

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Mon, Sep 26, 2016, at 10:01, muhammet pakyürek wrote:
>
>
>
> but my requst is related to python because i have designed preprocess
> for data which looks for rows including NaN values. if the number of
> Nan is high above the threshodl. it s deleted otherwise fill it with a
> predictive value. therefore i need python version for this process
>
>
>
> *From:* Bedrytski Aliaksandr  *Sent:* Monday,
> September 26, 2016 7:53 AM *To:* muhammet pakyürek *Cc:*
> user@spark.apache.org *Subject:* Re: how to find NaN values of each
> row of spark dataframe to decide whether the rows is dropeed or not
>
> Hi Muhammet,
>
> have you tried to use sql queries?
>
>> spark.sql("""
>> SELECT
>> field1,
>> field2,
>> field3
>>FROM table1
>>WHERE
>> field1 != 'Nan',
>> field2 != 'Nan',
>> field3 != 'Nan'
>> """)
>
> This query filters rows containing Nan for a table with 3 columns.
>
> Regards,
> --
>   Bedrytski Aliaksandr
>   sp...@bedryt.ski
>
>
>
> On Mon, Sep 26, 2016, at 09:30, muhammet pakyürek wrote:
>>
>> is there any way to do this directly.  if its not, is there any todo
>> this indirectly using another datastrcutures of spark
>>
>


Re: how to find NaN values of each row of spark dataframe to decide whether the rows is dropeed or not

2016-09-26 Thread Bedrytski Aliaksandr
Hi Muhammet,

have you tried to use sql queries?

> spark.sql("""
> SELECT
> field1,
> field2,
> field3
>FROM table1
>WHERE
> field1 != 'Nan',
> field2 != 'Nan',
> field3 != 'Nan'
> """)

This query filters rows containing Nan for a table with 3 columns.

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Mon, Sep 26, 2016, at 09:30, muhammet pakyürek wrote:
>
> is there any way to do this directly.  if its not, is there any todo
> this indirectly using another datastrcutures of spark
>


Re: udf forces usage of Row for complex types?

2016-09-25 Thread Bedrytski Aliaksandr
Hi Koert,

these case classes you are talking about, should be serializeable to be
efficient (like kryo or just plain java serialization).

DataFrame is not simply a collection of Rows (which are serializeable by
default), it also contains a schema with different type for each column.
This way any columnar data may be represented without creating custom
case classes each time.

If you want to manipulate a collection of case classes, why not use good
old RDDs? (Or DataSets if you are using Spark 2.0)
If you want to use sql against that collection, you will need to explain
to your application how to read it as a table (by transforming it to a
DataFrame)

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Sun, Sep 25, 2016, at 23:41, Koert Kuipers wrote:
> after having gotten used to have case classes represent complex
> structures in Datasets, i am surprised to find out that when i work in
> DataFrames with udfs no such magic exists, and i have to fall back to
> manipulating Row objects, which is error prone and somewhat ugly.
> for example:
> case class Person(name: String, age: Int)
>
> val df = Seq((Person("john", 33), 5), (Person("mike", 30),
> 6)).toDF("person", "id")
> val df1 = df.withColumn("person", udf({ (p: Person) => p.copy(age =
> p.age + 1) }).apply(col("person")))
> df1.printSchema
> df1.show
> leads to:
> java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot
> be cast to Person


Re: Spark Application Log

2016-09-22 Thread Bedrytski Aliaksandr
Hi Divya,

Have you tried this command *yarn logs -applicationId
application_x_ *?
(where application_x_ is the id of the application and
may be found in the output of the 'spark-submit' command or in the
yarn's webui)
It will collect the logs from all the executors in one output.

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Thu, Sep 22, 2016, at 06:06, Divya Gehlot wrote:
> Hi,
> I have initialised the logging in my spark App
> */*Initialize Logging */
**val **log *= Logger.*getLogger*(getClass.getName)
>
> Logger.*getLogger*(*"org"*).setLevel(Level.*OFF*)
>
> Logger.*getLogger*(*"akka"*).setLevel(Level.*OFF*)
>
> log.warn("Some text"+Somemap.size)
>
> When I run my spark job in using spark-submit like as below spark-
> submit \ --master yarn-client \ --driver-memory 1G \ --executor-memory
> 1G \ --executor-cores 1 \ --num-executors 2 \ --class MainClass 
> /home/hadoop/Spark-assembly-
> 1.0.jar I could see the log in terminal itself
> 16/09/22 03:45:31 WARN MainClass$: SomeText  : 10
>
> When I set up this job in scheduler
> where I can see these logs?
>
> Thanks,
> Divya
>


Re: Get profile from sbt

2016-09-21 Thread Bedrytski Aliaksandr
Hi Saurabh,

you may use BuildInfo[1] sbt plugin to access values defined in
build.sbt

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Mon, Sep 19, 2016, at 18:28, Saurabh Malviya (samalviy) wrote:
> Hi,
>
> Is there any way equivalent to profiles in maven in sbt. I want spark
> build to pick up endpoints based on environment jar is built for
>
> In build.sbt we are ingesting variable dev,stage etc and pick up all
> dependencies. Similar way I need a way to pick up config for external
> dependencies like endpoints etc.
>
> Or another approach is there any way I can access variable defined in
> built.sbt in scala code.
>
> -Saurabh


Links:

  1. https://github.com/sbt/sbt-buildinfo


Re: SparkR error: reference is ambiguous.

2016-09-09 Thread Bedrytski Aliaksandr
Hi,

Can you use full-string queries in SparkR?
Like (in Scala):

df1.registerTempTable("df1")
df2.registerTempTable("df2")
val df3 = sparkContext.sql("SELECT * FROM df1 JOIN df2 ON df1.ra
= df2.ra")

explicitly mentioning table names in the query often solves
ambiguity problems.

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Fri, Sep 9, 2016, at 19:33, xingye wrote:
> Not sure whether this is the right distribution list that I can ask
> questions. If not, can someone give a distribution list that can find
> someone to help?
>
> I kept getting error of reference is ambiguous when implementing some
> sparkR code.
>
> 1. when i tried to assign values to a column using the existing
>column:
> df$c_mon<- df$ra*0


>  1. 16/09/09 15:11:28 ERROR RBackendHandler: col on 3101 failed
>  2. Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
>  3.   org.apache.spark.sql.AnalysisException: Reference 'ra' is
>   ambiguous, could be: ra#8146, ra#13501.;
> 2. when I joined two spark dataframes using the key:
> df3<-join(df1, df2, df1$ra == df2$ra, "left")


>  1. 16/09/09 14:48:07 WARN Column: Constructing trivially true equals
> predicate, 'ra#8146 = ra#8146'. Perhaps you need to use aliases.
> Actually column "ra" is the column name, I don't know why sparkR keeps
> having errors about ra#8146 or ra#13501..
> Can someone help?
> Thanks


Re: Why does spark take so much time for simple task without calculation?

2016-09-09 Thread Bedrytski Aliaksandr
Hi xiefeng,

Even if your RDDs are tiny and reduced to one partition, there is always
orchestration overhead (sending tasks to executor(s), reducing results,
etc., these things are not free).

If you need fast, [near] real-time processing, look towards
spark-streaming.

Regards,
-- 
  Bedrytski Aliaksandr
  sp...@bedryt.ski

On Mon, Sep 5, 2016, at 04:36, xiefeng wrote:
> The spark context will be reused, so the spark context initialization
> won't
> affect the throughput test.
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-does-spark-take-so-much-time-for-simple-task-without-calculation-tp27628p27657.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why does spark take so much time for simple task without calculation?

2016-08-31 Thread Bedrytski Aliaksandr
Hi xiefeng,

Spark Context initialization takes some time and the tool does not
really shine for small data computations:
http://aadrake.com/command-line-tools-can-be-235x-faster-than-your-hadoop-cluster.html

But, when working with terabytes (petabytes) of data, those 35 seconds
of initialization don't really matter. 

Regards,

-- 
  Bedrytski Aliaksandr
  sp...@bedryt.ski

On Wed, Aug 31, 2016, at 11:45, xiefeng wrote:
> I install a spark standalone and run the spark cluster(one master and one
> worker) in a windows 2008 server with 16cores and 24GB memory.
> 
> I have done a simple test: Just create  a string RDD and simply return
> it. I
> use JMeter to test throughput but the highest is around 35/sec. I think
> spark is powerful at distribute calculation, but why the throughput is so
> limit in such simple test scenario only contains simple task dispatch and
> no
> calculation?
> 
> 1. In JMeter I test both 10 threads or 100 threads, there is little
> difference around 2-3/sec.
> 2. I test both cache/not cache the RDD, there is little difference. 
> 3. During the test, the cpu and memory are in low level.
> 
> Below is my test code:
> @RestController
> public class SimpleTest {   
>   @RequestMapping(value = "/SimpleTest", method = RequestMethod.GET)
>   @ResponseBody
>   public String testProcessTransaction() {
>   return SparkShardTest.simpleRDDTest();
>   }
> }
> 
> final static Map> simpleRDDs =
> initSimpleRDDs();
> public static Map> initSimpleRDDs()
>   {
>   Map> result = new 
> ConcurrentHashMap JavaRDD<String>>();
>   JavaRDD rddData = JavaSC.parallelize(data);
>   rddData.cache().count();//this cache will improve 1-2/sec
>   result.put("MyRDD", rddData);
>   return result;
>   }
>   
>   public static String simpleRDDTest()
>   {   
>   JavaRDD rddData = simpleRDDs.get("MyRDD");
>   return rddData.first();
>   }
> 
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-does-spark-take-so-much-time-for-simple-task-without-calculation-tp27628.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to acess the WrappedArray

2016-08-29 Thread Bedrytski Aliaksandr
Hi,

It depends on how you see "elements from the WrappedArray" represented.
Is it a List[Any] or you need a special case class for each line? Or you
want to create a DataFrame that will hold the type for each column?

Will the json file always be < 100mb so that you can pre-treat it with a
*sed* command?
If it's the case I would recommend to transform this file into a csv (as
it is a more structured type of file) using bash tools and then read it
with spark while casting column types to the ones that are expected (or
leave the inferred types if they are sufficient enough).

Or (if the file is expected to be larger than bash tools can handle) you
could iterate over the resulting WrappedArray and create a case class
for each line.

PS: I wonder where the *meta* object from the json goes.

--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Mon, Aug 29, 2016, at 11:27, Sreeharsha wrote:
> Here is the snippet of code :
>
> //The entry point into all functionality in Spark is the SparkSession
> class. To create a basic SparkSession, just use
> SparkSession.builder():
> SparkSession spark = SparkSession.builder().appName("Java Spark SQL
> Example").master("local").getOrCreate();
> //With a SparkSession, applications can create DataFrames from an
> existing RDD, from a Hive table, or from Spark data sources.
> Dataset rows_salaries =
> spark.read().json("/Users/sreeharsha/Downloads/rows_salaries.json");
> // Register the DataFrame as a SQL temporary view
> rows_salaries.createOrReplaceTempView("salaries");
> // SQL statements can be run by using the sql methods provided
> by spark
> List df = spark.sql("select * from salaries").collectAsList();
> for(Row r:df){
> if(r.get(0)!=null)
>System.out.println(r.get(0).toString());
> }
>
>
> Actaul Output :
> WrappedArray(WrappedArray(1, B9B42DE1-E810-4489-9735-B365A47A4012, 1,
> 1467358044, 697390, 1467358044, 697390, null, Aaron,Patricia G,
> Facilities/Office Services II, A03031, OED-Employment Dev (031), 1979-10-
> 24T00:00:00, 56705.00, 54135.44))
> Expecting Output:
> Need elements from the WrappedArray
> Below you can find the attachment of .json file
>
>   *rows_salaries.json* (4M) Download Attachment[1]
>
> View this message in context:How to acess the WrappedArray[2]
>  Sent from the Apache Spark User List mailing list archive[3] at
>  Nabble.com.


Links:

  1. 
http://apache-spark-user-list.1001560.n3.nabble.com/attachment/27615/0/rows_salaries.json
  2. 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-acess-the-WrappedArray-tp27615.html
  3. http://apache-spark-user-list.1001560.n3.nabble.com/


Re: Best way to calculate intermediate column statistics

2016-08-26 Thread Bedrytski Aliaksandr
Hi Mich,

I was wondering what are the advantages of using helper methods instead
of one SQL multiline string?
(I rarely (if ever) use helper methods, but maybe I'm missing something)

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Thu, Aug 25, 2016, at 11:39, Mich Talebzadeh wrote:
> Hi Richard,
>
> Windowing/Analytics for stats are pretty simple. Example
>
> import org.apache.spark.sql.expressions.Window val wSpec =
> Window.partitionBy('transactiontype).orderBy(desc("transactiondate"))
> df.filter('transactiondescription.contains(HASHTAG)).select('transact-
> iondate,'transactiondescription,
> *rank().over(wSpec).as("rank")).*filter($"rank"===1).show(1)
>
> val wSpec5 =
> Window.partitionBy('hashtag).orderBy(substring('transactiondate,1,4))
> val newDF = df.where('transactiontype === "DEB" && ('transactiondescr-
> iption).isNotNull).select(substring('transactiondate,1,4).as("Year"),
> 'hashtag.as("Retailer"),*round(sum('debitamount).over(wSpec5),2).as("-
> Spent"*))
> newDF.distinct.orderBy('year,'Retailer).collect.foreach(println)
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which
> may arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary
> damages arising from such loss, damage or destruction.
>
>
>
>
> On 25 August 2016 at 08:24, Richard Siebeling
>  wrote:
>> Hi Mich,
>>
>> thanks for the suggestion, I hadn't thought of that. We'll need to
>> gather the statistics in two ways, incremental when new data arrives
>> and over the complete set when aggregating or filtering (because I
>> think it's difficult to gather statistics while aggregating or
>> filtering).
>> The analytic functions could help when gathering the statistics over
>> the whole set,
>>
>> kind regards,
>> Richard
>>
>>
>>
>> On Wed, Aug 24, 2016 at 10:54 PM, Mich Talebzadeh
>>  wrote:
>>> Hi Richard,
>>>
>>> can you use analytics functions for this purpose on DF
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn *
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for any loss, damage or destruction of data or any other property
>>> which may arise from relying on this email's technical content is
>>> explicitly disclaimed. The author will in no case be liable for any
>>> monetary damages arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On 24 August 2016 at 21:37, Richard Siebeling 
>>> wrote:
>>>> Hi Mich,
>>>>
>>>> I'd like to gather several statistics per column in order to make
>>>> analysing data easier. These two statistics are some examples,
>>>> other statistics I'd like to gather are the variance, the median,
>>>> several percentiles, etc.  We are building a data analysis platform
>>>> based on Spark,
>>>>
>>>> kind regards,
>>>> Richard
>>>>
>>>> On Wed, Aug 24, 2016 at 6:52 PM, Mich Talebzadeh
>>>>  wrote:
>>>>> Hi Richard,
>>>>>
>>>>> What is the business use case for such statistics?
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn *
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>> for any loss, damage or destruction of data or any other property
>>>>> which may arise from relyi

Re: Best way to calculate intermediate column statistics

2016-08-24 Thread Bedrytski Aliaksandr
Hi Richard,

these intermediate statistics should be calculated from the result of
the calculation or during the aggregation?
If they can be derived from the resulting dataframe, why not to cache
(persist) that result just after the calculation?
Then you may aggregate statistics from the cached dataframe.
This way it won't hit performance too much.

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Wed, Aug 24, 2016, at 16:42, Richard Siebeling wrote:
> Hi,
>
> what is the best way to calculate intermediate column statistics like
> the number of empty values and the number of distinct values each
> column in a dataset when aggregating of filtering data next to the
> actual result of the aggregate or the filtered data?
>
> We are developing an application in which the user can slice-and-dice
> through the data and we would like to, next to the actual resulting
> data, get column statistics of each column in the resulting dataset.
> We prefer to calculate the column statistics on the same pass over the
> data as the actual aggregation or filtering, is that possible?
>
> We could sacrifice a little bit of performance (but not too much),
> that's why we prefer one pass...
>
> Is this possible in the standard Spark or would this mean
> modifying the source a little bit and recompiling? Is that
> feasible / wise to do?
>
> thanks in advance,
> Richard
>
>
>
>


Re: DataFrame Data Manipulation - Based on a timestamp column Not Working

2016-08-24 Thread Bedrytski Aliaksandr
Hi Subhajit,

you may try to use sql queries instead of helper methods:

> sales_order_base_dataFrame.registerTempTable("sales_orders")
>
> val result = sqlContext.sql("""
> SELECT *
> FROM sales_orders
> WHERE unix_timestamp(SCHEDULE_SHIP_DATE,'__-MM-_dd_') >=
> unix_timestamp(demand_timefence_end_date ,'__-MM-_dd_')
> """)

This is if demand_timefence_end_date  has '__-MM-_dd_' date format

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Wed, Aug 24, 2016, at 00:46, Subhajit Purkayastha wrote:
> Using spark 2.0  & scala 2.11.8, I have a DataFrame with a
> timestamp column
>
> root
> |-- ORG_ID: integer (nullable = true)
> |-- HEADER_ID: integer (nullable = true)
> |-- ORDER_NUMBER: integer (nullable = true)
> |-- LINE_ID: integer (nullable = true)
> |-- LINE_NUMBER: integer (nullable = true)
> |-- ITEM_TYPE_CODE: string (nullable = true)
> |-- ORGANIZATION_ID: integer (nullable = true)
> |-- INVENTORY_ITEM_ID: integer (nullable = true)
> |-- SCHEDULE_SHIP_DATE: timestamp (nullable = true)
> |-- ORDER_QUANTITY_UOM: string (nullable = true)
> |-- UNIT_SELLING_PRICE: double (nullable = true)
> |-- OPEN_QUANTITY: double (nullable = true)
>
> [204,94468,56721,197328,1,STANDARD,207,149,2004-01-08
> 23:59:59.0,Ea,1599.0,28.0]
> [204,94468,56721,197331,2,STANDARD,207,151,2004-01-08
> 23:59:59.0,Ea,1899.05,40.0]
> [204,94468,56721,197332,3,STANDARD,207,436,2004-01-08
> 23:59:59.0,Ea,300.0,24.0]
> [204,94468,56721,197335,4,STANDARD,207,3751,2004-01-08
> 23:59:59.0,Ea,380.0,24.0]
>
> I want to manipulate the dataframe data based on a parameter =
> demand_time_fence_date
>
> *var* demand_timefence_end_date_instance = *new*
> MutableDateTime(planning_start_date)
> *var* demand_timefence_days =
> demand_timefence_end_date_instance.addDays(demand_time_fence)
> *val* demand_timefence_end_date = ISODateTimeFormat.yearMonthDay().pr-
> int(demand_timefence_end_date_instance)
>
> _var_ filter_stmt = "from_unixtime(SCHEDULE_SHIP_DATE,'__-MM-
> _dd_') >= "+ demand_timefence_end_date
>
> *val* sales_order_dataFrame =
> sales_order_base_dataFrame.filter(filter_stmt).limit(10)
>
> What is the correct syntax to pass the parameter value?
>
> The above filter statement is not working to restrict the dataset
>
> Thanks,
>
> Subhajit
>
>


Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-22 Thread Bedrytski Aliaksandr
Hi Everett,

HiveContext is initialized only once as a lazy val, so if you mean
initializing different jvms for each (or a group of) test(s), then in
this case the context will not, obviously, be shared.

But specs2 (by default) launches specs (inside of tests classes) in
parallel threads and in this case the context is shared.

To sum up, tests are launched sequentially, but specs inside of tests
are launched in parallel. We don't have anything specific in our .sbt
file in regards to the parallel test execution and hive context is
initialized only once.

In my opinion (correct me if I'm wrong), if you already have >1 specs
per test, the CPU will be already saturated, so total parallel execution
of tests will not give additional gains.

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Sun, Aug 21, 2016, at 18:30, Everett Anderson wrote:
>
>
> On Sun, Aug 21, 2016 at 3:08 AM, Bedrytski Aliaksandr
>  wrote:
>> __
>> Hi,
>>
>> we share the same spark/hive context between tests (executed in
>> parallel), so the main problem is that the temporary tables are
>> overwritten each time they are created, this may create race
>> conditions
>> as these tempTables may be seen as global mutable shared state.
>>
>> So each time we create a temporary table, we add an unique,
>> incremented,
>> thread safe id (AtomicInteger) to its name so that there are only
>> specific, non-shared temporary tables used for a test.
>
> Makes sense.
>
> But when you say you're sharing the same spark/hive context between
> tests, I'm assuming that's between the same tests within one test
> class, but you're not sharing across test classes (which a build tool
> like Maven or Gradle might have executed in separate JVMs).
>
> Is that right?
>
>
>
>>
>>
>> --
>>   Bedrytski Aliaksandr
>>   sp...@bedryt.ski
>>
>>
>>
>>> On Sat, Aug 20, 2016, at 01:25, Everett Anderson wrote:
>>> Hi!
>>>
>>> Just following up on this --
>>>
>>> When people talk about a shared session/context for testing
>>> like this,
>>> I assume it's still within one test class. So it's still the
>>> case that
>>> if you have a lot of test classes that test Spark-related
>>> things, you
>>> must configure your build system to not run in them in parallel.
>>> You'll get the benefit of not creating and tearing down a Spark
>>> session/context between test cases with a test class, though.
>>>
>>> Is that right?
>>>
>>> Or have people figured out a way to have sbt (or Maven/Gradle/etc)
>>> share Spark sessions/contexts across integration tests in a
>>> safe way?
>>>
>>>
>>> On Mon, Aug 1, 2016 at 3:23 PM, Holden Karau
>>>  wrote:
>>> Thats a good point - there is an open issue for spark-testing-
>>> base to
>>> support this shared sparksession approach - but I haven't had the
>>> time ( https://github.com/holdenk/spark-testing-base/issues/123 ).
>>> I'll try and include this in the next release :)
>>>
>>> On Mon, Aug 1, 2016 at 9:22 AM, Koert Kuipers
>>>  wrote:
>>> we share a single single sparksession across tests, and they can run
>>> in parallel. is pretty fast
>>>
>>> On Mon, Aug 1, 2016 at 12:02 PM, Everett Anderson
>>>  wrote:
>>> Hi,
>>>
>>> Right now, if any code uses DataFrame/Dataset, I need a test setup
>>> that brings up a local master as in this article[1].
>>>
>>>
>>> That's a lot of overhead for unit testing and the tests can't run
>>> in parallel, so testing is slow -- this is more like what I'd call
>>> an integration test.
>>>
>>> Do people have any tricks to get around this? Maybe using spy mocks
>>> on fake DataFrame/Datasets?
>>>
>>> Anyone know if there are plans to make more traditional unit
>>> testing possible with Spark SQL, perhaps with a stripped down in-
>>> memory implementation? (I admit this does seem quite hard since
>>> there's so much functionality in these classes!)
>>>
>>> Thanks!
>>>
>>>
>>> - Everett
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>


Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-21 Thread Bedrytski Aliaksandr
Hi,

we share the same spark/hive context between tests (executed in
parallel), so the main problem is that the temporary tables are
overwritten each time they are created, this may create race conditions
as these tempTables may be seen as global mutable shared state.

So each time we create a temporary table, we add an unique, incremented,
thread safe id (AtomicInteger) to its name so that there are only
specific, non-shared temporary tables used for a test.

--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



> On Sat, Aug 20, 2016, at 01:25, Everett Anderson wrote:
> Hi!
>
> Just following up on this --
>
> When people talk about a shared session/context for testing like this,
> I assume it's still within one test class. So it's still the case that
> if you have a lot of test classes that test Spark-related things, you
> must configure your build system to not run in them in parallel.
> You'll get the benefit of not creating and tearing down a Spark
> session/context between test cases with a test class, though.
>
> Is that right?
>
> Or have people figured out a way to have sbt (or Maven/Gradle/etc)
> share Spark sessions/contexts across integration tests in a safe way?
>
>
> On Mon, Aug 1, 2016 at 3:23 PM, Holden Karau
>  wrote:
> Thats a good point - there is an open issue for spark-testing-base to
> support this shared sparksession approach - but I haven't had the
> time ( https://github.com/holdenk/spark-testing-base/issues/123 ).
> I'll try and include this in the next release :)
>
> On Mon, Aug 1, 2016 at 9:22 AM, Koert Kuipers
>  wrote:
> we share a single single sparksession across tests, and they can run
> in parallel. is pretty fast
>
> On Mon, Aug 1, 2016 at 12:02 PM, Everett Anderson
>  wrote:
> Hi,
>
> Right now, if any code uses DataFrame/Dataset, I need a test setup
> that brings up a local master as in this article[1].
>
> That's a lot of overhead for unit testing and the tests can't run
> in parallel, so testing is slow -- this is more like what I'd call
> an integration test.
>
> Do people have any tricks to get around this? Maybe using spy mocks
> on fake DataFrame/Datasets?
>
> Anyone know if there are plans to make more traditional unit
> testing possible with Spark SQL, perhaps with a stripped down in-
> memory implementation? (I admit this does seem quite hard since
> there's so much functionality in these classes!)
>
> Thanks!
>
>
> - Everett
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau


Re: Losing executors due to memory problems

2016-08-12 Thread Bedrytski Aliaksandr
Hi Vinay,

just out of curiosity, why are you converting your Dataframes into RDDs
before the join? Join works quite well with Dataframes.

As for your problem, it looks like you gave to your executors more
memory than you physically have. As an example of executors
configuration:

> Cluster of 6 nodes, 16 cores/node, 64 ram/node => Gives: 17 executors,
> 19Gb/exec, 5 cores/exec
> No more than 5 cores per exec
> Leave some cores/Ram for the driver

More on the matter here
http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Fri, Aug 12, 2016, at 01:41, Muttineni, Vinay wrote:
> Hello,
> I have a spark job that basically reads data from two tables into two
> Dataframes which are subsequently converted to RDD's. I, then, join
> them based on a common key.
> Each table is about 10 TB in size but after filtering, the two RDD’s
> are about 500GB each.
> I have 800 executors with 8GB memory per executor.
> Everything works fine until the join stage. But, the join stage is
> throwing the below error.
> I tried increasing the partitions before the join stage but it doesn’t
> change anything.
> Any ideas, how I can fix this and what I might be doing wrong?
> Thanks,
> Vinay
>
> ExecutorLostFailure (executor 208 exited caused by one of the running
> tasks) Reason: Container marked as failed:
> container_1469773002212_96618_01_000246 on host:. Exit status: 143.
> Diagnostics: Container
> [pid=31872,containerID=container_1469773002212_96618_01_000246] is
> running beyond physical memory limits. Current usage: 15.2 GB of 15.1
> GB physical memory used; 15.9 GB of 31.8 GB virtual memory used.
> Killing container.
> Dump of the process-tree for container_1469773002212_96618_01_000246 :
>  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>  |  SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
>  |  FULL_CMD_LINE
>  |- 31883 31872 31872 31872 (java) 519517 41888 17040175104
>  |  3987193 /usr/java/latest/bin/java -server -
>  |  XX:OnOutOfMemoryError=kill %p -Xms14336m -Xmx14336m -
>  |  Djava.io.tmpdir=/hadoop/11/scratch/local/usercacheappcach-
>  |  e/application_1469773002212_96618/container_1469773002212-
>  |  _96618_01_000246/tmp -Dspark.driver.port=32988 -
>  |  Dspark.ui.port=0 -Dspark.akka.frameSize=256 -
>  |  Dspark.yarn.app.container.log.dir=/hadoop/12/scratch/logs-
>  |  /application_1469773002212_96618/container_1469773002212_-
>  |  96618_01_000246 -XX:MaxPermSize=256m
>  |  org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-
>  |  url spark://CoarseGrainedScheduler@10.12.7.4:32988 --executor-
>  |  id 208 –hostname x.com --cores 11 --app-id
>  |  application_1469773002212_96618 --user-class-path
>  |  file:/hadoop/11/scratch/local/usercache /appcache/applica-
>  |  tion_1469773002212_96618/container_1469773002212_96618_01-
>  |  _000246/__app__.jar --user-class-path
>  |  file:/hadoop/11/scratch/local/usercache/ appcache/applica-
>  |  tion_1469773002212_96618/container_1469773002212_96618_01-
>  |  _000246/mysql-connector-java-5.0.8-bin.jar --user-class-
>  |  path file:/hadoop/11/scratch/local/usercache/appcache/app-
>  |  lication_1469773002212_96618/container_1469773002212_9661-
>  |  8_01_000246/datanucleus-core-3.2.10.jar --user-class-path
>  |  file:/hadoop/11/scratch/local/usercache/appcache/applicat-
>  |  ion_1469773002212_96618/container_1469773002212_96618_01_-
>  |  000246/datanucleus-api-jdo-3.2.6.jar --user-class-path fi-
>  |  le:/hadoop/11/scratch/local/usercache/appcache/applicatio-
>  |  n_1469773002212_96618/container_1469773002212_96618_01_00-
>  |  0246/datanucleus-rdbms-3.2.9.jar
>  |- 31872 16580 31872 31872 (bash) 0 0 9146368 267 /bin/bash
>  |  -c LD_LIBRARY_PATH=/apache/hadoop/lib/native:/apache/hado-
>  |  op/lib/native/Linux-amd64-64: /usr/java/latest/bin/java
>  |  -server -XX:OnOutOfMemoryError='kill %p' -Xms14336m -
>  |  Xmx14336m -
>  |  Djava.io.tmpdir=/hadoop/11/scratch/local/usercache/ appca-
>  |  che/application_1469773002212_96618/container_14697730022-
>  |  12_96618_01_000246/tmp '-Dspark.driver.port=32988' '-
>  |  Dspark.ui.port=0' '-Dspark.akka.frameSize=256' -
>  |  Dspark.yarn.app.container.log.dir=/hadoop/12/scratch/logs-
>  |  /application_1469773002212_96618/container_1469773002212_-
>  |  96618_01_000246 -XX:MaxPermSize=256m

Re: Random forest binary classification H20 difference Spark

2016-08-10 Thread Bedrytski Aliaksandr
Hi Samir,

either use *dataframe.na.fill()* method or the *nvl()* UDF when
selecting features:

val train = sqlContext.sql("SELECT ... nvl(Field, 1.0) AS Field ...
FROM test")

--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Wed, Aug 10, 2016, at 11:19, Yanbo Liang wrote:
> Hi Samir,
>
> Did you use VectorAssembler to assemble some columns into the feature
> column? If there are NULLs in your dataset, VectorAssembler will throw
> this exception. You can use DataFrame.drop() or DataFrame.replace() to
> drop/substitute NULL values.
>
> Thanks
> Yanbo
>
> 2016-08-07 19:51 GMT-07:00 Javier Rey :
>> Hi everybody.
>> I have executed RF on H2O I didn't troubles with nulls values, by in
>> contrast in Spark using dataframes and ML library I obtain this
>> error,l I know my dataframe contains nulls, but I understand that
>> Random Forest supports null values:
>>
>> "Values to assemble cannot be null"
>>
>> Any advice, that framework can handle this issue?.
>>
>> Regards,
>> Samir