Logging DataFrame API pipelines

2019-04-02 Thread Magnus Nilsson
Hello all,

How do you log what is happening inside your Spark Dataframe pipelines?

I would like to collect statistics along the way, mostly count of rows at
particular steps, to see where rows where filtered and what not. Is there
any other way to do this than calling .count on the dataframe?

 Regards,

Magnus


Re: How to extract data in parallel from RDBMS tables

2019-04-02 Thread Jason Nerothin
I can *imagine* writing some sort of DataframeReader-generation tool, but
am not aware of one that currently exists.

On Tue, Apr 2, 2019 at 13:08 Surendra , Manchikanti <
surendra.manchika...@gmail.com> wrote:

>
> Looking for a generic solution, not for a specific DB or number of tables.
>
>
> On Fri, Mar 29, 2019 at 5:04 AM Jason Nerothin 
> wrote:
>
>> How many tables? What DB?
>>
>> On Fri, Mar 29, 2019 at 00:50 Surendra , Manchikanti <
>> surendra.manchika...@gmail.com> wrote:
>>
>>> Hi Jason,
>>>
>>> Thanks for your reply, But I am looking for a way to parallelly extract
>>> all the tables in a Database.
>>>
>>>
>>> On Thu, Mar 28, 2019 at 2:50 PM Jason Nerothin 
>>> wrote:
>>>
 Yes.

 If you use the numPartitions option, your max parallelism will be that
 number. See also: partitionColumn, lowerBound, and upperBound

 https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

 On Wed, Mar 27, 2019 at 23:06 Surendra , Manchikanti <
 surendra.manchika...@gmail.com> wrote:

> Hi All,
>
> Is there any way to copy all the tables in parallel from RDBMS using
> Spark? We are looking for a functionality similar to Sqoop.
>
> Thanks,
> Surendra
>
> --
 Thanks,
 Jason

>>> --
>> Thanks,
>> Jason
>>
> --
Thanks,
Jason


Re: How to extract data in parallel from RDBMS tables

2019-04-02 Thread Surendra , Manchikanti
Looking for a generic solution, not for a specific DB or number of tables.


On Fri, Mar 29, 2019 at 5:04 AM Jason Nerothin 
wrote:

> How many tables? What DB?
>
> On Fri, Mar 29, 2019 at 00:50 Surendra , Manchikanti <
> surendra.manchika...@gmail.com> wrote:
>
>> Hi Jason,
>>
>> Thanks for your reply, But I am looking for a way to parallelly extract
>> all the tables in a Database.
>>
>>
>> On Thu, Mar 28, 2019 at 2:50 PM Jason Nerothin 
>> wrote:
>>
>>> Yes.
>>>
>>> If you use the numPartitions option, your max parallelism will be that
>>> number. See also: partitionColumn, lowerBound, and upperBound
>>>
>>> https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
>>>
>>> On Wed, Mar 27, 2019 at 23:06 Surendra , Manchikanti <
>>> surendra.manchika...@gmail.com> wrote:
>>>
 Hi All,

 Is there any way to copy all the tables in parallel from RDBMS using
 Spark? We are looking for a functionality similar to Sqoop.

 Thanks,
 Surendra

 --
>>> Thanks,
>>> Jason
>>>
>> --
> Thanks,
> Jason
>


Re: Issues with Spark Streaming checkpointing of Kafka topic content

2019-04-02 Thread Dmitry Goldenberg
To add more info, this project is on an older version of Spark, 1.5.0, and
on an older version of Kafka which is 0.8.2.1 (2.10-0.8.2.1).

On Tue, Apr 2, 2019 at 11:39 AM Dmitry Goldenberg 
wrote:

> Hi,
>
> I've got 3 questions/issues regarding checkpointing, was hoping someone
> could help shed some light on this.
>
> We've got a Spark Streaming consumer consuming data from a Kafka topic;
> works fine generally until I switch it to the checkpointing mode by calling
> the 'checkpoint' method on the context and pointing the checkpointing at a
> directory in HDFS.
>
> I can see that files get written to that directory however I don't see new
> Kafka content being processed.
>
> *Question 1.* Is it possible that the checkpointed consumer is off base
> in its understanding of where the offsets are on the topic and how could I
> troubleshoot that?  Is it possible that some "confusion" happens if a
> consumer is switched back and forth between checkpointed and not? How could
> we tell?
>
> *Question 2.* About spark.streaming.receiver.writeAheadLog.enable. By
> default this is false. "All the input data received through receivers
> will be saved to write ahead logs that will allow it to be recovered after
> driver failures."  So if we don't set this to true, what *will* get saved
> into checkpointing and what data *will* be recovered upon the driver
> restarting?
>
> *Question 3.* We want the RDD's to be treated as successfully processed
> only once we have done all the necessary transformations and actions on the
> data.  By default, will the Spark Streaming checkpointing simply mark the
> topic offsets as having been processed once the data has been received by
> Spark?  Or, once the data has been processed by the driver + the workers
> successfully?  If the former, how can we configure checkpointing to do the
> latter?
>
> Thanks,
> - Dmitry
>


Issues with Spark Streaming checkpointing of Kafka topic content

2019-04-02 Thread Dmitry Goldenberg
Hi,

I've got 3 questions/issues regarding checkpointing, was hoping someone
could help shed some light on this.

We've got a Spark Streaming consumer consuming data from a Kafka topic;
works fine generally until I switch it to the checkpointing mode by calling
the 'checkpoint' method on the context and pointing the checkpointing at a
directory in HDFS.

I can see that files get written to that directory however I don't see new
Kafka content being processed.

*Question 1.* Is it possible that the checkpointed consumer is off base in
its understanding of where the offsets are on the topic and how could I
troubleshoot that?  Is it possible that some "confusion" happens if a
consumer is switched back and forth between checkpointed and not? How could
we tell?

*Question 2.* About spark.streaming.receiver.writeAheadLog.enable. By
default this is false. "All the input data received through receivers will
be saved to write ahead logs that will allow it to be recovered after
driver failures."  So if we don't set this to true, what *will* get saved
into checkpointing and what data *will* be recovered upon the driver
restarting?

*Question 3.* We want the RDD's to be treated as successfully processed
only once we have done all the necessary transformations and actions on the
data.  By default, will the Spark Streaming checkpointing simply mark the
topic offsets as having been processed once the data has been received by
Spark?  Or, once the data has been processed by the driver + the workers
successfully?  If the former, how can we configure checkpointing to do the
latter?

Thanks,
- Dmitry


[Spark ML] [Pyspark] [Scenario Beginner] [Level Beginner]

2019-04-02 Thread Steve Pruitt
I am still struggling with getting fit() to work on my dataset.
The Spark ML exception that is the issue is:

LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a 
singular matrix (e.g. collinear column values)?

Comparing my standardized Weight values with the tutorial's values.  I see I 
have some negative values.  The tutorial values are all positive.  The above 
exception message mentions non positive value, so it's probably my issue.

The calculation for standardizing my Weight values Weight - Weight_Mean / 
Weight_StdDev is producing negative values when the Weight which can between 1 
- 72000 is small.
I have a suggestion to try using MinMaxScaler.  But, it operates on a Vector 
and I have a single value.  Not sure, I see how I make this work.

My stats is very old.  Is there a way to achieve positive values only when 
standardizing something like my Weight values above?

Thanks.

-S



From: Steve Pruitt 
Sent: Monday, April 01, 2019 12:39 PM
To: user 
Subject: [EXTERNAL] - [Spark ML] [Pyspark] [Scenario Beginner] [Level Beginner]

After following a tutorial on Recommender systems using Pyspark / Spark ML.  I 
decided to jump in with my own dataset.  I am specifically trying to predict 
video suggestions based on an implicit feature for the time a video was 
watched.  I wrote a generator to produce my dataset.  I have a total of five 
videos each 1200 seconds in length.  I randomly selected which videos a user 
watched and a random time between 0-1200.  I generated 10k records.  Weight is 
the time watched feature.  It looks a like this.

UserId,VideoId,Weight
0,1,645
0,2,870
0,3,1075
0,4,486
0,5,900
1,1,353
1,2,988
1,3,152
1,4,953
1,5,641
2,3,12
2,4,444
2,5,87
3,2,658
3,4,270
3,5,530
4,2,722
4,3,255
:

After reading the dataset.  I convert all columns to Integer in place.  
Describing Weight produces:

   summary  Weight
0 count   30136
1 mean   597.717945314574
2 stddev 346.475684454489
3 min  0
4 max 1200

Next, I standardized the weight column by:

df = dataset.select(mean('Weight').alias('mean_weight'), 
stddev('Weight').alias('stddev_weight')).crossJoin(dataset).withColumn('weight_scaled',
 (col('Weight') - col('mean_weight')) / col('stddev_weight'))

df.toPandas().head() shows:

   mean_weight  stddev_weight  UserId  VideoId  Weight  weight_scaled
0  597.717945   346.47568401 6450.136466
1  597.717945   346.47568402 8700.785862
2  597.717945   346.475684031075   1.377534
3  597.717945   346.47568404486 
-0.322441
4  597.717945   346.47568405900 0.872448
:
10 597.717945   346.475684   2   3 12  -1.690502
11 597.717945   346.475684   2   4 444-0.443662
12 597.717945   346.475684   2   5 87   
-1.474037
:

After splitting df 80 / 20 to get training / testing

I defined the ALS algo with:

als = ALS(maxIter=10, regParam=0.1, userCol='UserId', itemCol='VideoId', 
implicitPrefs=True, ratingCol='weight_scaled', coldStartStrategy='drop')

and then

model = als.fit(trainingData)

Calling fit() is where I get the following error, I don't understand.

Py4JJavaError Traceback (most recent call last)
 in 
> 1 model = als.fit(trainingData)

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, 
dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a 
list/tuple of param maps, "

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in 
_fit(self, dataset)
286
287 def _fit(self, dataset):
--> 288 java_model = self._fit_java(dataset)
289 model = self._create_model(java_model)
290 return self._copyValues(model)

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in 
_fit_java(self, dataset)
283 """
284 self._transfer_params_to_java()
--> 285 return self._java_obj.fit(dataset._jdf)
286
287 def _fit(self, dataset):

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py
 in __call__(self, *args)
   1158 answer = self.gateway_client.send_command(command)
   1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
   1161
   1162 for temp_arg in temp_args:

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py in 
deco(*a, **kw)
61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except 

Load Time from HDFS

2019-04-02 Thread Jack Kolokasis

Hello,

    I want to ask if there any way to measure HDFS data loading time at 
the start of my program. I tried to add an action e.g count() after val 
data = sc.textFile() call. But I notice that my program takes more time 
to finish than before adding count call. Is there any other way to do it ?


Thanks,
--Iacovos

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



MLLIB , Does Spark support Canopy Clustering ?

2019-04-02 Thread Alok Bhandari
Hello All ,

I am interested to use bisecting k-means algorithm implemented in spark.
While using bisecting k-means I found that some of my clustering requests
on large data-set failed with OOM issues.

As data-set size is expected to be large , so I wanted to use some
pre-processing steps to reduce resource requirements. If found that Canopy
Clustering helps in that. I could not anything equivalent to it in spark.
Is something available? or is it planned in some future releases .

Please let me know. Thank you