Logging DataFrame API pipelines
Hello all, How do you log what is happening inside your Spark Dataframe pipelines? I would like to collect statistics along the way, mostly count of rows at particular steps, to see where rows where filtered and what not. Is there any other way to do this than calling .count on the dataframe? Regards, Magnus
Re: How to extract data in parallel from RDBMS tables
I can *imagine* writing some sort of DataframeReader-generation tool, but am not aware of one that currently exists. On Tue, Apr 2, 2019 at 13:08 Surendra , Manchikanti < surendra.manchika...@gmail.com> wrote: > > Looking for a generic solution, not for a specific DB or number of tables. > > > On Fri, Mar 29, 2019 at 5:04 AM Jason Nerothin > wrote: > >> How many tables? What DB? >> >> On Fri, Mar 29, 2019 at 00:50 Surendra , Manchikanti < >> surendra.manchika...@gmail.com> wrote: >> >>> Hi Jason, >>> >>> Thanks for your reply, But I am looking for a way to parallelly extract >>> all the tables in a Database. >>> >>> >>> On Thu, Mar 28, 2019 at 2:50 PM Jason Nerothin >>> wrote: >>> Yes. If you use the numPartitions option, your max parallelism will be that number. See also: partitionColumn, lowerBound, and upperBound https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html On Wed, Mar 27, 2019 at 23:06 Surendra , Manchikanti < surendra.manchika...@gmail.com> wrote: > Hi All, > > Is there any way to copy all the tables in parallel from RDBMS using > Spark? We are looking for a functionality similar to Sqoop. > > Thanks, > Surendra > > -- Thanks, Jason >>> -- >> Thanks, >> Jason >> > -- Thanks, Jason
Re: How to extract data in parallel from RDBMS tables
Looking for a generic solution, not for a specific DB or number of tables. On Fri, Mar 29, 2019 at 5:04 AM Jason Nerothin wrote: > How many tables? What DB? > > On Fri, Mar 29, 2019 at 00:50 Surendra , Manchikanti < > surendra.manchika...@gmail.com> wrote: > >> Hi Jason, >> >> Thanks for your reply, But I am looking for a way to parallelly extract >> all the tables in a Database. >> >> >> On Thu, Mar 28, 2019 at 2:50 PM Jason Nerothin >> wrote: >> >>> Yes. >>> >>> If you use the numPartitions option, your max parallelism will be that >>> number. See also: partitionColumn, lowerBound, and upperBound >>> >>> https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html >>> >>> On Wed, Mar 27, 2019 at 23:06 Surendra , Manchikanti < >>> surendra.manchika...@gmail.com> wrote: >>> Hi All, Is there any way to copy all the tables in parallel from RDBMS using Spark? We are looking for a functionality similar to Sqoop. Thanks, Surendra -- >>> Thanks, >>> Jason >>> >> -- > Thanks, > Jason >
Re: Issues with Spark Streaming checkpointing of Kafka topic content
To add more info, this project is on an older version of Spark, 1.5.0, and on an older version of Kafka which is 0.8.2.1 (2.10-0.8.2.1). On Tue, Apr 2, 2019 at 11:39 AM Dmitry Goldenberg wrote: > Hi, > > I've got 3 questions/issues regarding checkpointing, was hoping someone > could help shed some light on this. > > We've got a Spark Streaming consumer consuming data from a Kafka topic; > works fine generally until I switch it to the checkpointing mode by calling > the 'checkpoint' method on the context and pointing the checkpointing at a > directory in HDFS. > > I can see that files get written to that directory however I don't see new > Kafka content being processed. > > *Question 1.* Is it possible that the checkpointed consumer is off base > in its understanding of where the offsets are on the topic and how could I > troubleshoot that? Is it possible that some "confusion" happens if a > consumer is switched back and forth between checkpointed and not? How could > we tell? > > *Question 2.* About spark.streaming.receiver.writeAheadLog.enable. By > default this is false. "All the input data received through receivers > will be saved to write ahead logs that will allow it to be recovered after > driver failures." So if we don't set this to true, what *will* get saved > into checkpointing and what data *will* be recovered upon the driver > restarting? > > *Question 3.* We want the RDD's to be treated as successfully processed > only once we have done all the necessary transformations and actions on the > data. By default, will the Spark Streaming checkpointing simply mark the > topic offsets as having been processed once the data has been received by > Spark? Or, once the data has been processed by the driver + the workers > successfully? If the former, how can we configure checkpointing to do the > latter? > > Thanks, > - Dmitry >
Issues with Spark Streaming checkpointing of Kafka topic content
Hi, I've got 3 questions/issues regarding checkpointing, was hoping someone could help shed some light on this. We've got a Spark Streaming consumer consuming data from a Kafka topic; works fine generally until I switch it to the checkpointing mode by calling the 'checkpoint' method on the context and pointing the checkpointing at a directory in HDFS. I can see that files get written to that directory however I don't see new Kafka content being processed. *Question 1.* Is it possible that the checkpointed consumer is off base in its understanding of where the offsets are on the topic and how could I troubleshoot that? Is it possible that some "confusion" happens if a consumer is switched back and forth between checkpointed and not? How could we tell? *Question 2.* About spark.streaming.receiver.writeAheadLog.enable. By default this is false. "All the input data received through receivers will be saved to write ahead logs that will allow it to be recovered after driver failures." So if we don't set this to true, what *will* get saved into checkpointing and what data *will* be recovered upon the driver restarting? *Question 3.* We want the RDD's to be treated as successfully processed only once we have done all the necessary transformations and actions on the data. By default, will the Spark Streaming checkpointing simply mark the topic offsets as having been processed once the data has been received by Spark? Or, once the data has been processed by the driver + the workers successfully? If the former, how can we configure checkpointing to do the latter? Thanks, - Dmitry
[Spark ML] [Pyspark] [Scenario Beginner] [Level Beginner]
I am still struggling with getting fit() to work on my dataset. The Spark ML exception that is the issue is: LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)? Comparing my standardized Weight values with the tutorial's values. I see I have some negative values. The tutorial values are all positive. The above exception message mentions non positive value, so it's probably my issue. The calculation for standardizing my Weight values Weight - Weight_Mean / Weight_StdDev is producing negative values when the Weight which can between 1 - 72000 is small. I have a suggestion to try using MinMaxScaler. But, it operates on a Vector and I have a single value. Not sure, I see how I make this work. My stats is very old. Is there a way to achieve positive values only when standardizing something like my Weight values above? Thanks. -S From: Steve Pruitt Sent: Monday, April 01, 2019 12:39 PM To: user Subject: [EXTERNAL] - [Spark ML] [Pyspark] [Scenario Beginner] [Level Beginner] After following a tutorial on Recommender systems using Pyspark / Spark ML. I decided to jump in with my own dataset. I am specifically trying to predict video suggestions based on an implicit feature for the time a video was watched. I wrote a generator to produce my dataset. I have a total of five videos each 1200 seconds in length. I randomly selected which videos a user watched and a random time between 0-1200. I generated 10k records. Weight is the time watched feature. It looks a like this. UserId,VideoId,Weight 0,1,645 0,2,870 0,3,1075 0,4,486 0,5,900 1,1,353 1,2,988 1,3,152 1,4,953 1,5,641 2,3,12 2,4,444 2,5,87 3,2,658 3,4,270 3,5,530 4,2,722 4,3,255 : After reading the dataset. I convert all columns to Integer in place. Describing Weight produces: summary Weight 0 count 30136 1 mean 597.717945314574 2 stddev 346.475684454489 3 min 0 4 max 1200 Next, I standardized the weight column by: df = dataset.select(mean('Weight').alias('mean_weight'), stddev('Weight').alias('stddev_weight')).crossJoin(dataset).withColumn('weight_scaled', (col('Weight') - col('mean_weight')) / col('stddev_weight')) df.toPandas().head() shows: mean_weight stddev_weight UserId VideoId Weight weight_scaled 0 597.717945 346.47568401 6450.136466 1 597.717945 346.47568402 8700.785862 2 597.717945 346.475684031075 1.377534 3 597.717945 346.47568404486 -0.322441 4 597.717945 346.47568405900 0.872448 : 10 597.717945 346.475684 2 3 12 -1.690502 11 597.717945 346.475684 2 4 444-0.443662 12 597.717945 346.475684 2 5 87 -1.474037 : After splitting df 80 / 20 to get training / testing I defined the ALS algo with: als = ALS(maxIter=10, regParam=0.1, userCol='UserId', itemCol='VideoId', implicitPrefs=True, ratingCol='weight_scaled', coldStartStrategy='drop') and then model = als.fit(trainingData) Calling fit() is where I get the following error, I don't understand. Py4JJavaError Traceback (most recent call last) in > 1 model = als.fit(trainingData) C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params) 130 return self.copy(params)._fit(dataset) 131 else: --> 132 return self._fit(dataset) 133 else: 134 raise ValueError("Params must be either a param map or a list/tuple of param maps, " C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset) 286 287 def _fit(self, dataset): --> 288 java_model = self._fit_java(dataset) 289 model = self._create_model(java_model) 290 return self._copyValues(model) C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset) 283 """ 284 self._transfer_params_to_java() --> 285 return self._java_obj.fit(dataset._jdf) 286 287 def _fit(self, dataset): C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value( -> 1160 answer, self.gateway_client, self.target_id, self.name) 1161 1162 for temp_arg in temp_args: C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except
Load Time from HDFS
Hello, I want to ask if there any way to measure HDFS data loading time at the start of my program. I tried to add an action e.g count() after val data = sc.textFile() call. But I notice that my program takes more time to finish than before adding count call. Is there any other way to do it ? Thanks, --Iacovos - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
MLLIB , Does Spark support Canopy Clustering ?
Hello All , I am interested to use bisecting k-means algorithm implemented in spark. While using bisecting k-means I found that some of my clustering requests on large data-set failed with OOM issues. As data-set size is expected to be large , so I wanted to use some pre-processing steps to reduce resource requirements. If found that Canopy Clustering helps in that. I could not anything equivalent to it in spark. Is something available? or is it planned in some future releases . Please let me know. Thank you