Re: Looping through a series of telephone numbers
Hi Philippe, I would like to draw your attention to this great library that saved my day in the past when parsing phone numbers in Spark: https://github.com/google/libphonenumber If you combine it with Bjørn's suggestions you will have a good start on your linkage task. Best regards, Anastasios Zouzias On Sat, Apr 1, 2023 at 8:31 PM Philippe de Rochambeau wrote: > Hello, > I’m looking for an efficient way in Spark to search for a series of > telephone numbers, contained in a CSV file, in a data set column. > > In pseudo code, > > for tel in [tel1, tel2, …. tel40,000] > search for tel in dataset using .like(« %tel% ») > end for > > I’m using the like function because the telephone numbers in the data set > main contain prefixes, such as « + « ; e.g., « +331222 ». > > Any suggestions would be welcome. > > Many thanks. > > Philippe > > > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias
[Structured Streaming] Robust watermarking calculation with future timestamps
Hi all, We currently have the following issue with a Spark Structured Streaming (SS) application. The application reads messages from thousands of source systems, stores them in Kafka and Spark aggregates them using SS and watermarking (15 minutes). The root problem is that a few of the source systems have a wrong timezone setup that makes them emit messages from the future, i.e., +1 hour ahead of current time (mis-configuration or winter/summer timezone change (yeah!) ). Since watermarking is calculated as (most latest timestamp value of all messages) - (watermarking threshold value, 15 mins), most of the messages are dropped due to the fact that are delayed by more than 45 minutes. To an even more extreme scenario, even a single "future" / adversarial message can make the structured streaming application to report zero messages (per mini-batch). Is there any user exposed SS API that allows a more robust calculation of watermarking, i.e., 95th percentile of timestamps instead of max timestamp? I understand that such calculation will be more expensive, but it will make the application more robust. Any suggestions/ideas? PS. Of course the best approach would be to fix the issue on all source systems but this might take time to do so (or perhaps drop future messages programmatically (yikes) ). Best regards, Anastasios
Re: Handling of watermark in structured streaming
Hi Joe, How often do you trigger your mini-batch? Maybe you can specify the trigger time explicitly to a low value or even better set it off. See: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers Best, Anastasios On Tue, May 14, 2019 at 3:49 PM Joe Ammann wrote: > Hi all > > I'm fairly new to Spark structured streaming and I'm only starting to > develop an understanding for the watermark handling. > > Our application reads data from a Kafka input topic and as one of the > first steps, it has to group incoming messages. Those messages come in > bulks, e.g. 5 messages which belong to the same "business event" (share a > common key), with event timestamps differing in only a few millisecs. And > then no messages for say 3 minutes. And after that another bulk of 3 > messages with very close event timestamps. > > I have set a watermark of 20 seconds on my streaming query, and a groupBy > on the shared common key, and a window of 20 seconds (10 seconds sliding). > So something like > > df = inputStream.withWatermark("eventtime", "20 > seconds").groupBy("sharedId", window("20 seconds", "10 seconds") > > The output mode is set to append, since I intend to join this streams with > other streams later in the application. > > Naively, I would have expected to see any incoming bulk of messages as an > aggregated message ~20 seconds after it's eventtime on the output stream. > But my observations indicate that the "latest bulk of events" always stays > queued inside the query, until a new bulk of events arrive and bump up the > watermark. In my example above, this means that I see the first bulk of > events only after 3 minutes, when the second bulk comes in. > > This does indeed make some sense, and if I understand the documentation > correctly the watermark is only ever updated upon arrival of new inputs. > The "real time" does not play a role in the setting of watermarks. > > But to me this means that any bulk of events is prohibited from being sent > downstreams until a new bulk comes in. This is not what I intended. > > Is my understanding more or less correct? And is there any way of bringing > "the real time" into the calculation of the watermark (short of producing > regular dummy messages which are then again filtered out). > > -- > CU, Joe > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias
Re: Spark Structured Streaming | Highly reliable de-duplication strategy
Hi, Have you checked the docs, i.e., https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication You can generate a uuid column in your streaming DataFrame and drop duplicate messages with a single line of code. Best, Anastasios On Wed, May 1, 2019 at 11:15 AM Akshay Bhardwaj < akshay.bhardwaj1...@gmail.com> wrote: > Hi All, > > Floating this again. Any suggestions? > > > Akshay Bhardwaj > +91-97111-33849 > > > On Tue, Apr 30, 2019 at 7:30 PM Akshay Bhardwaj < > akshay.bhardwaj1...@gmail.com> wrote: > >> Hi Experts, >> >> I am using spark structured streaming to read message from Kafka, with a >> producer that works with at-least once guarantee. This streaming job is >> running on Yarn cluster with hadoop 2.7 and spark 2.3 >> >> What is the most reliable strategy for avoiding duplicate data within >> stream in the scenarios of fail-over or job restarts/re-submits, and >> guarantee exactly once non-duplicate stream? >> >> >>1. One of the strategies I have read other people using is to >>maintain an external KV store for unique-key/checksum of the incoming >>message, and write to a 2nd kafka topic only if the checksum is not >> present >>in KV store. >>- My doubts with this approach is how to ensure safe write to both >> the 2nd topic and to KV store for storing checksum, in the case of >> unwanted >> failures. How does that guarantee exactly-once with restarts? >> >> Any suggestions are highly appreciated. >> >> >> Akshay Bhardwaj >> +91-97111-33849 >> > -- -- Anastasios Zouzias
Re: Packaging kafka certificates in uber jar
Hi Colin, You can place your certificates under src/main/resources and include them in the uber JAR, see e.g. : https://stackoverflow.com/questions/40252652/access-files-in-resources-directory-in-jar-from-apache-spark-streaming-context Best, Anastasios On Mon, Dec 24, 2018 at 10:29 PM Colin Williams < colin.williams.seat...@gmail.com> wrote: > I've been trying to read from kafka via a spark streaming client. I > found out spark cluster doesn't have certificates deployed. Then I > tried using the same local certificates I've been testing with by > packing them in an uber jar and getting a File handle from the > Classloader resource. But I'm getting a File Not Found exception. > These are jks certificates. Is anybody aware how to package > certificates in a jar with a kafka client preferably the spark one? > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias
Re: conflicting version question
Hi Nathan, You can try to shade the dependency version that you want to use. That said, shading is a tricky technique. Good luck. https://softwareengineering.stackexchange.com/questions/297276/what-is-a-shaded-java-dependency See also elasticsearch's discussion on shading https://www.elastic.co/de/blog/to-shade-or-not-to-shade Best, Anastasios On Fri, 26 Oct 2018, 15:45 Nathan Kronenfeld, wrote: > Our code is currently using Gson 2.8.5. Spark, through Hadoop-API, pulls > in Gson 2.2.4. > > At the moment, we just get "method X not found" exceptions because of this > - because when we run in Spark, 2.2.4 is what gets loaded. > > Is there any way to have both versions exist simultaneously? To load 2.8.5 > so that our code uses it, without messing up spark? > > Thanks, > -Nathan Kronenfeld >
Re: Dataframe from 1.5G json (non JSONL)
Are you sure that your JSON file has the right format? spark.read.json(...) expects a file where *each line is a json object*. My wild guess is that val hdf=spark.read.json("/user/tmp/hugedatafile") hdf.show(2) or hdf.take(1) gives OOM tries to fetch all the data into the driver. Can you reformat your input file and try again? Best, Anastasios On Tue, Jun 5, 2018 at 8:39 PM, raksja wrote: > I have a json file which is a continuous array of objects of similar type > [{},{}...] for about 1.5GB uncompressed and 33MB gzip compressed. > > This is uploaded hugedatafile to hdfs and this is not a JSONL file, its a > whole regular json file. > > > [{"id":"1","entityMetadata":{"lastChange":"2018-05-11 > 01:09:18.0","createdDateTime":"2018-05-11 > 01:09:18.0","modifiedDateTime":"2018-05-11 > 01:09:18.0"},"type":"11"},{"id":"2","entityMetadata":{" > lastChange":"2018-05-11 > 01:09:18.0","createdDateTime":"2018-05-11 > 01:09:18.0","modifiedDateTime":"2018-05-11 > 01:09:18.0"},"type":"11"},{"id":"3","entityMetadata":{" > lastChange":"2018-05-11 > 01:09:18.0","createdDateTime":"2018-05-11 > 01:09:18.0","modifiedDateTime":"2018-05-11 > 01:09:18.0"},"type":"11"}..] > > > I get OOM on executors whenever i try to load this into spark. > > Try 1 > val hdf=spark.read.json("/user/tmp/hugedatafile") > hdf.show(2) or hdf.take(1) gives OOM > > Try 2 > Took a small sampledatafile and got schema to avoid schema infering > val sampleSchema=spark.read.json("/user/tmp/sampledatafile").schema > val hdf=spark.read.schema(sampleSchema).json("/user/tmp/hugedatafile") > hdf.show(2) or hdf.take(1) stuck for 1.5 hrs and gives OOM > > Try 3 > Repartition it after before performing action > gives OOM > > Try 4 > Read about the https://issues.apache.org/jira/browse/SPARK-20980 > completely > val hdf = spark.read.option("multiLine", > true)..schema(sampleSchema).json("/user/tmp/hugedatafile") > hdf.show(1) or hdf.take(1) gives OOM > > > Can any one help me here? > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias
Re: Fastest way to drop useless columns
Hi Julien, One quick and easy to implement idea is to use sampling on your dataset, i.e., sample a large enough subset of your data and test is there are no unique values on some columns. Repeat the process a few times and then do the full test on the surviving columns. This will allow you to load only a subset of your dataset if it is stored in Parquet. Best, Anastasios On Thu, May 31, 2018 at 10:34 AM, wrote: > Hi there ! > > I have a potentially large dataset ( regarding number of rows and cols ) > > And I want to find the fastest way to drop some useless cols for me, i.e. > cols containing only an unique value ! > > I want to know what do you think that I could do to do this as fast as > possible using spark. > > > I already have a solution using distinct().count() or approxCountDistinct() > But, they may not be the best choice as this requires to go through all > the data, even if the 2 first tested values for a col are already different > ( and in this case I know that I can keep the col ) > > > Thx for your ideas ! > > Julien > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias
Re: Can spark handle this scenario?
>>>>>>> >>>>>>> I have a user case: >>>>>>> >>>>>>> I want to download S stock data from Yahoo API in parallel >>>>>>> using Spark. I have got all stock symbols as a Dataset. Then I used >>>>>>> below >>>>>>> code to call Yahoo API for each symbol: >>>>>>> >>>>>>> >>>>>>> >>>>>>> case class Symbol(symbol: String, sector: String) >>>>>>> >>>>>>> case class Tick(symbol: String, sector: String, open: Double, close: >>>>>>> Double) >>>>>>> >>>>>>> >>>>>>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns >>>>>>> Dataset[Tick] >>>>>>> >>>>>>> >>>>>>> symbolDs.map { k => >>>>>>> >>>>>>> pullSymbolFromYahoo(k.symbol, k.sector) >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> This statement cannot compile: >>>>>>> >>>>>>> >>>>>>> Unable to find encoder for type stored in a Dataset. Primitive >>>>>>> types (Int, String, etc) and Product types (case classes) are supported >>>>>>> by >>>>>>> importing spark.implicits._ Support for serializing other types >>>>>>> will be added in future releases. >>>>>>> >>>>>>> >>>>>>> My questions are: >>>>>>> >>>>>>> >>>>>>> 1. As you can see, this scenario is not traditional dataset handling >>>>>>> such as count, sql query... Instead, it is more like a UDF which apply >>>>>>> random operation on each record. Is Spark good at handling such >>>>>>> scenario? >>>>>>> >>>>>>> >>>>>>> 2. Regarding the compilation error, any fix? I did not find a >>>>>>> satisfactory solution online. >>>>>>> >>>>>>> >>>>>>> Thanks for help! >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> Ayan Guha >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Ayan Guha >>>>> >>>> >>>> >> > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Several Aggregations on a window function
Hi, You can use https://twitter.github.io/algebird/ which provides an implementation of interesting Monoids and ways to combine them to tuples (or products) of Monoids. Of course, you are not bound to use the algebird library but it might be helpful to bootstrap. On Mon, Dec 18, 2017 at 7:18 PM, Julien CHAMP <jch...@tellmeplus.com> wrote: > It seems interesting, however scalding seems to require be used outside of > spark ? > > > Le lun. 18 déc. 2017 à 17:15, Anastasios Zouzias <zouz...@gmail.com> a > écrit : > >> Hi Julien, >> >> I am not sure if my answer applies on the streaming part of your >> question. However, in batch processing, if you want to perform multiple >> aggregations over an RDD with a single pass, a common approach is to use >> multiple aggregators (a.k.a. tuple monoids), see below an example from >> algebird: >> >> https://github.com/twitter/scalding/wiki/Aggregation- >> using-Algebird-Aggregators#composing-aggregators. >> >> Best, >> Anastasios >> >> On Mon, Dec 18, 2017 at 10:38 AM, Julien CHAMP <jch...@tellmeplus.com> >> wrote: >> >>> I've been looking for several solutions but I can't find something >>> efficient to compute many window function efficiently ( optimized >>> computation or efficient parallelism ) >>> Am I the only one interested by this ? >>> >>> >>> Regards, >>> >>> Julien >>> >> >>> Le ven. 15 déc. 2017 à 21:34, Julien CHAMP <jch...@tellmeplus.com> a >>> écrit : >>> >>>> May be I should consider something like impala ? >>>> >>>> Le ven. 15 déc. 2017 à 11:32, Julien CHAMP <jch...@tellmeplus.com> a >>>> écrit : >>>> >>>>> Hi Spark Community members ! >>>>> >>>>> I want to do several ( from 1 to 10) aggregate functions using window >>>>> functions on something like 100 columns. >>>>> >>>>> Instead of doing several pass on the data to compute each aggregate >>>>> function, is there a way to do this efficiently ? >>>>> >>>>> >>>>> >>>>> Currently it seems that doing >>>>> >>>>> >>>>> val tw = >>>>> Window >>>>> .orderBy("date") >>>>> .partitionBy("id") >>>>> .rangeBetween(-803520L, 0) >>>>> >>>>> and then >>>>> >>>>> x >>>>>.withColumn("agg1", max("col").over(tw)) >>>>>.withColumn("agg2", min("col").over(tw)) >>>>>.withColumn("aggX", avg("col").over(tw)) >>>>> >>>>> >>>>> Is not really efficient :/ >>>>> It seems that it iterates on the whole column for each aggregation ? >>>>> Am I right ? >>>>> >>>>> Is there a way to compute all the required operations on a columns >>>>> with a single pass ? >>>>> Event better, to compute all the required operations on ALL columns >>>>> with a single pass ? >>>>> >>>>> Thx for your Future[Answers] >>>>> >>>>> Julien >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> >>>>> Julien CHAMP — Data Scientist >>>>> >>>>> >>>>> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : >>>>> **jch...@tellmeplus.com >>>>> <jch...@tellmeplus.com>* >>>>> >>>>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* : *here* >>>>> <https://www.linkedin.com/in/julienchamp> >>>>> >>>>> TellMePlus S.A — Predictive Objects >>>>> >>>>> *Paris* : 7 rue des Pommerots, 78400 Chatou >>>>> <https://maps.google.com/?q=7+rue+des+Pommerots,+78400+Chatou=gmail=g> >>>>> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière >>>>> <https://maps.google.com/?q=51+impasse+des+%C3%A9glantiers,+34980+St+Cl%C3%A9ment+de+Rivi%C3%A8re=gmail=g> >>>>> >>>> -- >>>> >>>> >>>> Julien CHAMP — Data Scientist >>>> >>>> >>>> *Web : **www.tell
Re: Several Aggregations on a window function
Hi Julien, I am not sure if my answer applies on the streaming part of your question. However, in batch processing, if you want to perform multiple aggregations over an RDD with a single pass, a common approach is to use multiple aggregators (a.k.a. tuple monoids), see below an example from algebird: https://github.com/twitter/scalding/wiki/Aggregation-using-Algebird-Aggregators#composing-aggregators . Best, Anastasios On Mon, Dec 18, 2017 at 10:38 AM, Julien CHAMP <jch...@tellmeplus.com> wrote: > I've been looking for several solutions but I can't find something > efficient to compute many window function efficiently ( optimized > computation or efficient parallelism ) > Am I the only one interested by this ? > > > Regards, > > Julien > > Le ven. 15 déc. 2017 à 21:34, Julien CHAMP <jch...@tellmeplus.com> a > écrit : > >> May be I should consider something like impala ? >> >> Le ven. 15 déc. 2017 à 11:32, Julien CHAMP <jch...@tellmeplus.com> a >> écrit : >> >>> Hi Spark Community members ! >>> >>> I want to do several ( from 1 to 10) aggregate functions using window >>> functions on something like 100 columns. >>> >>> Instead of doing several pass on the data to compute each aggregate >>> function, is there a way to do this efficiently ? >>> >>> >>> >>> Currently it seems that doing >>> >>> >>> val tw = >>> Window >>> .orderBy("date") >>> .partitionBy("id") >>> .rangeBetween(-803520L, 0) >>> >>> and then >>> >>> x >>>.withColumn("agg1", max("col").over(tw)) >>>.withColumn("agg2", min("col").over(tw)) >>>.withColumn("aggX", avg("col").over(tw)) >>> >>> >>> Is not really efficient :/ >>> It seems that it iterates on the whole column for each aggregation ? Am >>> I right ? >>> >>> Is there a way to compute all the required operations on a columns with >>> a single pass ? >>> Event better, to compute all the required operations on ALL columns with >>> a single pass ? >>> >>> Thx for your Future[Answers] >>> >>> Julien >>> >>> >>> >>> >>> >>> -- >>> >>> >>> Julien CHAMP — Data Scientist >>> >>> >>> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : >>> **jch...@tellmeplus.com >>> <jch...@tellmeplus.com>* >>> >>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* : *here* >>> <https://www.linkedin.com/in/julienchamp> >>> >>> TellMePlus S.A — Predictive Objects >>> >>> *Paris* : 7 rue des Pommerots, 78400 Chatou >>> <https://maps.google.com/?q=7+rue+des+Pommerots,+78400+Chatou=gmail=g> >>> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière >>> <https://maps.google.com/?q=51+impasse+des+%C3%A9glantiers,+34980+St+Cl%C3%A9ment+de+Rivi%C3%A8re=gmail=g> >>> >> -- >> >> >> Julien CHAMP — Data Scientist >> >> >> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : >> **jch...@tellmeplus.com >> <jch...@tellmeplus.com>* >> >> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* : *here* >> <https://www.linkedin.com/in/julienchamp> >> >> TellMePlus S.A — Predictive Objects >> >> *Paris* : 7 rue des Pommerots, 78400 Chatou >> <https://maps.google.com/?q=7+rue+des+Pommerots,+78400+Chatou=gmail=g> >> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière >> <https://maps.google.com/?q=51+impasse+des+%C3%A9glantiers,+34980+St+Cl%C3%A9ment+de+Rivi%C3%A8re=gmail=g> >> > -- > > > Julien CHAMP — Data Scientist > > > *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : > **jch...@tellmeplus.com > <jch...@tellmeplus.com>* > > *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* : *here* > <https://www.linkedin.com/in/julienchamp> > > TellMePlus S.A — Predictive Objects > > *Paris* : 7 rue des Pommerots, 78400 Chatou > <https://maps.google.com/?q=7+rue+des+Pommerots,+78400+Chatou=gmail=g> > *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière > <https://maps.google.com/?q=51+impasse+des+%C3%A9glantiers,+34980+St+Cl%C3%A9ment+de+Rivi%C3%A8re=gmail=g> > > > Ce message peut contenir des informations confidentielles ou couvertes par > le secret professionnel, à l’intention de son destinataire. Si vous n’en > êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer > toute copie. > This email may contain confidential and/or privileged information for the > intended recipient. If you are not the intended recipient, please contact > the sender and delete all copies. > > > <http://www.tellmeplus.com/assets/emailing/banner.html> > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: best spark spatial lib?
Hi, Which spatial operations do you require exactly? Also, I don't follow what you mean by combining logical operators? I have created a library that wraps Lucene's spatial functionality here: https://github.com/zouzias/spark-lucenerdd/wiki/Spatial-search You could give a try to the library, it supports intersections / within / etc. Ideally, I try to push all spatial Lucene features in the library. See also, https://github.com/zouzias/spark-lucenerdd/wiki/Related-Work for related libraries. Best, Anastasios On Tue, Oct 10, 2017 at 11:21 AM, Imran Rajjad <raj...@gmail.com> wrote: > I need to have a location column inside my Dataframe so that I can do > spatial queries and geometry operations. Are there any third-party packages > that perform this kind of operations. I have seen a few like Geospark and > megalan but they don't support operations where spatial and logical > operators can be combined. > > regards, > Imran > > -- > I.R > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Error - Spark reading from HDFS via dataframes - Java
Hi, Set the inferschema option to true in spark-csv. you may also want to set the mode option. See readme below https://github.com/databricks/spark-csv/blob/master/README.md Best, Anastasios Am 01.10.2017 07:58 schrieb "Kanagha Kumar": Hi, I'm trying to read data from HDFS in spark as dataframes. Printing the schema, I see all columns are being read as strings. I'm converting it to RDDs and creating another dataframe by passing in the correct schema ( how the rows should be interpreted finally). I'm getting the following error: Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of bigint Spark read API: Dataset hdfs_dataset = new SQLContext(spark).read().option("header", "false").csv("hdfs:/inputpath/*"); Dataset ds = new SQLContext(spark).createDataFrame(hdfs_dataset.toJavaRDD(), conversionSchema); This is the schema to be converted to: StructType(StructField(COL1,StringType,true), StructField(COL2,StringType,true), StructField(COL3,LongType,true), StructField(COL4,StringType,true), StructField(COL5,StringType,true), StructField(COL6,LongType,true)) This is the original schema obtained once read API was invoked StructType(StructField(_c1,StringType,true), StructField(_c2,StringType,true), StructField(_c3,StringType,true), StructField(_c4,StringType,true), StructField(_c5,StringType,true), StructField(_c6,StringType,true)) My interpretation is even when a JavaRDD is cast to dataframe by passing in the new schema, values are not getting type casted. This is occurring because the above read API reads data as string types from HDFS. How can I convert an RDD to dataframe by passing in the correct schema once it is read? How can the values by type cast correctly during this RDD to dataframe conversion? Or how can I read data from HDFS with an input schema in java? Any suggestions are helpful. Thanks!
Re: ConcurrentModificationException using Kafka Direct Stream
Hi, I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 solved my issue. Can you try with 2.1.1 as well and report back? Best, Anastasios Am 17.09.2017 16:48 schrieb "HARSH TAKKAR": Hi I am using spark 2.1.0 with scala 2.11.8, and while iterating over the partitions of each rdd in a dStream formed using KafkaUtils, i am getting the below exception, please suggest a fix. I have following config kafka : enable.auto.commit:"true", auto.commit.interval.ms:"1000", session.timeout.ms:"3", Spark: spark.streaming.backpressure.enabled=true spark.streaming.kafka.maxRatePerPartition=200 Exception in task 0.2 in stage 3236.0 (TID 77795) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access -- Kind Regards Harsh
Re: compile error: No classtag available while calling RDD.zip()
Hi there, If it is OK with you to work with DataFrames, you can do https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField,StructType,IntegerType, LongType} val df = sc.parallelize(Seq( (1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y") // Append "rowid" column of type Long val schema = df.schema val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false))) // Zip on RDD level val rddWithId = df.rdd.zipWithIndex // Convert back to DataFrame val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) // Show results dfZippedWithId.show Best, Anastasios On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏 <blue...@cnic.cn> wrote: > Hello,Since Dataset has no zip(..) methods, so I wrote following code to > zip two datasets: > > 1 def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: > Dataset[X], n: Dataset[Y]) = { > 2 val rdd = m.rdd.zip(n.rdd); > 3 import spark.implicits._ > 4 spark.createDataset(rdd); > 5 } > > However, in the m.rdd.zip(…) call, compile error is reported: No > ClassTag available for Y > > I know this error can be corrected when I declare Y as a ClassTag like > this: > > 1 def foo[X: Encoder, Y: ClassTag](spark: SparkSession, … > > But this will make line 5 report a new error: > Unable to find encoder for type stored in a Dataset. > > Now, I have no idea to solve this problem. How to declared Y as both an > Encoder and a ClassTag? > > Many thanks! > > Best regards, > bluejoe > --------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Spark SVD benchmark for dense matrices
Hi Jose, Just to note that in the databricks blog they state that they compute the top-5 singular vectors, not all singular values/vectors. Computing all is much more computational intense. Cheers, Anastasios Am 09.08.2017 15:19 schrieb "Jose Francisco Saray Villamizar" < jsa...@gmail.com>: Hi everyone, I am trying to invert a 5000 x 5000 Dense Matrix (99% non-zeros), by using SVD with an approach simmilar to : https://stackoverflow.com/questions/29969521/how-to- compute-the-inverse-of-a-rowmatrix-in-apache-spark The time Im getting with SVD is close to 10 minutes what is very long for me. A benchmark for SVD is already given here https://databricks.com/blog/2014/07/21/distributing-the- singular-value-decomposition-with-spark.html However, it seems they are using sparse matrices, thats why they get short times. Have anyone of you try to perform a SVD on a very dense big matrix . ? Is this time normal ? Thank you. -- -- Buen dia, alegria !! José Francisco Saray Villamizar cel +33 6 13710693 <+33%206%2013%2071%2006%2093> Lyon, France
Re: Slow responce on Solr Cloud with Spark
Hi Imran, It seems that you do not cache your underlying DataFrame. I would suggest to force a cache with tweets.cache() and then tweets.count(). Let us know if your problem persists. Best, Anastasios On Wed, Jul 19, 2017 at 2:49 PM, Imran Rajjad <raj...@gmail.com> wrote: > Greetings, > > We are trying out Spark 2 + ThriftServer to join multiple > collections from a Solr Cloud (6.4.x). I have followed this blog > https://lucidworks.com/2015/08/20/solr-spark-sql-datasource/ > > I understand that initially spark populates the temporary table with 18633014 > records and takes its due time, however any following SQLs on the > temporary table take the same amount of time . It seems the temporary > tables is not being re-used or cached. The fields in the solr collection do > not have the docValue enabled, could that be the reason? Apparently I have > missed a trick > > regards, > Imran > > -- > I.R > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Can we access files on Cluster mode
Hi Mich, If the driver starts on the edge node with cluster mode, then I don't see the difference between client and cluster deploy mode. In cluster mode, it is the responsibility of the resource manager (yarn, etc) to decide where to run the driver (at least for spark 1.6 this is what I have experienced). Best, Anastasios On Sun, Jun 25, 2017 at 11:14 AM, Mich Talebzadeh <mich.talebza...@gmail.com > wrote: > Hi Anastasios. > > Are you implying that in Yarn cluster mode even if you submit your Spark > application on an Edge node the driver can start on any node. I was under > the impression that the driver starts from the Edge node? and the executors > can be on any node in the cluster (where Spark agents are running)? > > Thanks > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 25 June 2017 at 09:39, Anastasios Zouzias <zouz...@gmail.com> wrote: > >> Just to note that in cluster mode the spark driver might run on any node >> of the cluster, hence you need to make sure that the file exists on *all* >> nodes. Push the file on all nodes or use client deploy-mode. >> >> Best, >> Anastasios >> >> >> Am 24.06.2017 23:24 schrieb "Holden Karau" <hol...@pigscanfly.ca>: >> >>> addFile is supposed to not depend on a shared FS unless the semantics >>> have changed recently. >>> >>> On Sat, Jun 24, 2017 at 11:55 AM varma dantuluri <dvsnva...@gmail.com> >>> wrote: >>> >>>> Hi Sudhir, >>>> >>>> I believe you have to use a shared file system that is accused by all >>>> nodes. >>>> >>>> >>>> On Jun 24, 2017, at 1:30 PM, sudhir k <k.sudhi...@gmail.com> wrote: >>>> >>>> >>>> I am new to Spark and i need some guidance on how to fetch files from >>>> --files option on Spark-Submit. >>>> >>>> I read on some forums that we can fetch the files from >>>> Spark.getFiles(fileName) and can use it in our code and all nodes should >>>> read it. >>>> >>>> But i am facing some issue >>>> >>>> Below is the command i am using >>>> >>>> spark-submit --deploy-mode cluster --class com.check.Driver --files >>>> /home/sql/first.sql test.jar 20170619 >>>> >>>> so when i use SparkFiles.get(first.sql) , i should be able to read the >>>> file Path but it is throwing File not Found exception. >>>> >>>> I tried SpackContext.addFile(/home/sql/first.sql) and then >>>> SparkFiles.get(first.sql) but still the same error. >>>> >>>> Its working on the stand alone mode but not on cluster mode. Any help >>>> is appreciated.. Using Spark 2.1.0 and Scala 2.11 >>>> >>>> Thanks. >>>> >>>> >>>> Regards, >>>> Sudhir K >>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Sudhir K >>>> >>>> >>>> -- >>> Cell : 425-233-8271 <(425)%20233-8271> >>> Twitter: https://twitter.com/holdenkarau >>> >> > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Can we access files on Cluster mode
Just to note that in cluster mode the spark driver might run on any node of the cluster, hence you need to make sure that the file exists on *all* nodes. Push the file on all nodes or use client deploy-mode. Best, Anastasios Am 24.06.2017 23:24 schrieb "Holden Karau": > addFile is supposed to not depend on a shared FS unless the semantics have > changed recently. > > On Sat, Jun 24, 2017 at 11:55 AM varma dantuluri > wrote: > >> Hi Sudhir, >> >> I believe you have to use a shared file system that is accused by all >> nodes. >> >> >> On Jun 24, 2017, at 1:30 PM, sudhir k wrote: >> >> >> I am new to Spark and i need some guidance on how to fetch files from >> --files option on Spark-Submit. >> >> I read on some forums that we can fetch the files from >> Spark.getFiles(fileName) and can use it in our code and all nodes should >> read it. >> >> But i am facing some issue >> >> Below is the command i am using >> >> spark-submit --deploy-mode cluster --class com.check.Driver --files >> /home/sql/first.sql test.jar 20170619 >> >> so when i use SparkFiles.get(first.sql) , i should be able to read the >> file Path but it is throwing File not Found exception. >> >> I tried SpackContext.addFile(/home/sql/first.sql) and then >> SparkFiles.get(first.sql) but still the same error. >> >> Its working on the stand alone mode but not on cluster mode. Any help is >> appreciated.. Using Spark 2.1.0 and Scala 2.11 >> >> Thanks. >> >> >> Regards, >> Sudhir K >> >> >> >> -- >> Regards, >> Sudhir K >> >> >> -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau >
Re: KMeans Clustering is not Reproducible
Hi Christoph, Take a look at this, you might end up having a similar case: http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/ If this is not the case, then I agree with you the kmeans should be partitioning agnostic (although I haven't check the code yet). Best, Anastasios On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke <carabo...@gmail.com> wrote: > Hi, > > I’m trying to figure out how to use KMeans in order to achieve > reproducible results. I have found that running the same kmeans instance on > the same data, with different partitioning will produce different > clusterings. > > Given a simple KMeans run with fixed seed returns different results on the > same > training data, if the training data is partitioned differently. > > Consider the following example. The same KMeans clustering set up is run on > identical data. The only difference is the partitioning of the training > data > (one partition vs. four partitions). > > ``` > import org.apache.spark.sql.DataFrame > import org.apache.spark.ml.clustering.KMeans > import org.apache.spark.ml.features.VectorAssembler > > // generate random data for clustering > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > val data = vecAssembler.transform(randomData) > > // instantiate KMeans with fixed seed > val kmeans = new KMeans().setK(10).setSeed(9876L) > > // train the model with different partitioning > val dataWith1Partition = data.repartition(1) > println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost( > dataWith1Partition)) > > val dataWith4Partition = data.repartition(4) > println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost( > dataWith4Partition)) > ``` > > I get the following related cost > > ``` > 1 Partition: 16.028212597888057 > 4 Partition: 16.14758460544976 > ``` > > What I want to achieve is that repeated computations of the KMeans > Clustering should yield identical result on identical training data, > regardless of the partitioning. > > Looking through the Spark source code, I guess the cause is the > initialization method of KMeans which in turn uses the `takeSample` method, > which does not seem to be partition agnostic. > > Is this behaviour expected? Is there anything I could do to achieve > reproducible results? > > Best, > Christoph > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Best Practice for Enum in Spark SQL
Hi Mike, FYI: Is you are using Spark 2.x, you might have issues with encoders if you use a case class with Enumeration type field, see https://issues.apache.org/jira/browse/SPARK-17248 For (1), (2), I would guess Int would be better (space-wise), but I am not familiar with parquet's internals. Best, Anastasios On Fri, May 12, 2017 at 5:07 AM, Mike Wheeler <rotationsymmetr...@gmail.com> wrote: > Hi Spark Users, > > I want to store Enum type (such as Vehicle Type: Car, SUV, Wagon) in my > data. My storage format will be parquet and I need to access the data from > Spark-shell, Spark SQL CLI, and hive. My questions: > > 1) Should I store my Enum type as String or store it as numeric encoding > (aka 1=Car, 2=SUV, 3=Wagon)? > > 2) If I choose String, any penalty in hard drive space or memory? > > Thank you! > > Mike > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: I am not sure why I am getting java.lang.NoClassDefFoundError
ask.run(Task.scala:86) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > 17/02/17 17:35:33 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, > localhost): java.lang.NoClassDefFoundError: scala/runtime/ > AbstractPartialFunction$mcJL$sp > > at java.lang.ClassLoader.defineClass1(Native Method) > > at java.lang.ClassLoader.defineClass(ClassLoader.java:763) > > at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > > at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) > > at java.net.URLClassLoader.access$100(URLClassLoader.java:73) > > at java.net.URLClassLoader$1.run(URLClassLoader.java:368) > > at java.net.URLClassLoader$1.run(URLClassLoader.java:362) > > at java.security.AccessController.doPrivileged(Native Method) > > at java.net.URLClassLoader.findClass(URLClassLoader.java:361) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator( > CassandraLimit.scala:21) > > at com.datastax.spark.connector.rdd.CassandraTableScanRDD. > compute(CassandraTableScanRDD.scala:367) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:79) > > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:47) > > at org.apache.spark.scheduler.Task.run(Task.scala:86) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.ClassNotFoundException: scala.runtime. > AbstractPartialFunction$mcJL$sp > > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > ... 35 more > > > 17/02/17 17:35:33 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 > times; aborting job > > 17/02/17 17:35:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, > localhost): java.lang.NoClassDefFoundError: com/datastax/spark/connector/ > rdd/CassandraLimit$$anonfun$limitForIterator$1 > > at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator( > CassandraLimit.scala:21) > > at com.datastax.spark.connector.rdd.CassandraTableScanRDD. > compute(CassandraTableScanRDD.scala:367) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:79) > > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:47) > > at org.apache.spark.scheduler.Task.run(Task.scala:86) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: NoNodeAvailableException (None of the configured nodes are available) error when trying to push data to Elastic from a Spark job
> 1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > > at org.apache.spark.streaming.dstream.DStream.createRDDWithLoca > lProperties(DStream.scala:415) > > at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$ > 1.apply$mcV$sp(ForEachDStream.scala:50) > > at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$ > 1.apply(ForEachDStream.scala:50) > > at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$ > 1.apply(ForEachDStream.scala:50) > > at scala.util.Try$.apply(Try.scala:192) > > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > > at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler > $$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247) > > at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler > $$anonfun$run$1.apply(JobScheduler.scala:247) > > at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler > $$anonfun$run$1.apply(JobScheduler.scala:247) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler > .run(JobScheduler.scala:246) > > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool > Executor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo > lExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: NoNodeAvailableException[None of the configured nodes are > available: [{#transport#-1}{XX.XXX.XXX.XX}{XX.XXX.XXX.XX:9300}]] > > at org.elasticsearch.client.transport.TransportClientNodesServi > ce.ensureNodesAreAvailable(TransportClientNodesService.java:290) > > at org.elasticsearch.client.transport.TransportClientNodesServi > ce.execute(TransportClientNodesService.java:207) > > at org.elasticsearch.client.transport.support.TransportProxyCli > ent.execute(TransportProxyClient.java:55) > > at org.elasticsearch.client.transport.TransportClient.doExecute > (TransportClient.java:288) > > at org.elasticsearch.client.support.AbstractClient.execute( > AbstractClient.java:359) > > at org.elasticsearch.client.support.AbstractClient$ClusterAdmin > .execute(AbstractClient.java:853) > > at org.elasticsearch.action.ActionRequestBuilder.execute(Action > RequestBuilder.java:86) > > at org.elasticsearch.action.ActionRequestBuilder.execute(Action > RequestBuilder.java:56) > > at org.elasticsearch.action.ActionRequestBuilder.get(ActionRequ > estBuilder.java:64) > > at com.myco.MyDriver.work() > > at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartit > ion$1.apply(JavaRDDLike.scala:218) > > at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartit > ion$1.apply(JavaRDDLike.scala:218) > > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfu > n$apply$28.apply(RDD.scala:902) > > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfu > n$apply$28.apply(RDD.scala:902) > > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkC > ontext.scala:1916) > > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkC > ontext.scala:1916) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca > la:70) > > at org.apache.spark.scheduler.Task.run(Task.scala:86) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor. > scala:274) > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Equally split a RDD partition into two partition at the same node
Hi Fei, I looked at the code of CoalescedRDD and probably what I suggested will not work. Speaking of which, CoalescedRDD is private[spark]. If this was not the case, you could set balanceSlack to 1, and get what you requested, see https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala#L75 Maybe you could try to use the CoalescedRDD code to implement your requirement. Good luck! Cheers, Anastasios On Sun, Jan 15, 2017 at 5:39 PM, Fei Hu <hufe...@gmail.com> wrote: > Hi Anastasios, > > Thanks for your reply. If I just increase the numPartitions to be twice > larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) keeps > the data locality? Do I need to define my own Partitioner? > > Thanks, > Fei > > On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com> > wrote: > >> Hi Fei, >> >> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ? >> >> https://github.com/apache/spark/blob/branch-1.6/core/src/ >> main/scala/org/apache/spark/rdd/RDD.scala#L395 >> >> coalesce is mostly used for reducing the number of partitions before >> writing to HDFS, but it might still be a narrow dependency (satisfying your >> requirements) if you increase the # of partitions. >> >> Best, >> Anastasios >> >> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote: >> >>> Dear all, >>> >>> I want to equally divide a RDD partition into two partitions. That >>> means, the first half of elements in the partition will create a new >>> partition, and the second half of elements in the partition will generate >>> another new partition. But the two new partitions are required to be at the >>> same node with their parent partition, which can help get high data >>> locality. >>> >>> Is there anyone who knows how to implement it or any hints for it? >>> >>> Thanks in advance, >>> Fei >>> >>> >> >> >> -- >> -- Anastasios Zouzias >> <a...@zurich.ibm.com> >> > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Equally split a RDD partition into two partition at the same node
Hi Fei, How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ? https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L395 coalesce is mostly used for reducing the number of partitions before writing to HDFS, but it might still be a narrow dependency (satisfying your requirements) if you increase the # of partitions. Best, Anastasios On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote: > Dear all, > > I want to equally divide a RDD partition into two partitions. That means, > the first half of elements in the partition will create a new partition, > and the second half of elements in the partition will generate another new > partition. But the two new partitions are required to be at the same node > with their parent partition, which can help get high data locality. > > Is there anyone who knows how to implement it or any hints for it? > > Thanks in advance, > Fei > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Broadcast destroy
Hi Bryan, I think the ContextCleaner will take care of the broadcasted variables, see i.e., https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-service-contextcleaner.html If it is easy to spot when to cleanup the broadcast variables in your case, a "xBroadcasted.destroy()" will make things more explicit. Best, Anastasios On Mon, Jan 2, 2017 at 4:26 PM, <bryan.jeff...@gmail.com> wrote: > All, > > > > Anyone have a thought? > > > > Thank you, > > > > Bryan Jeffrey > > > > *From: *bryan.jeff...@gmail.com > *Sent: *Friday, December 30, 2016 1:20 PM > *To: *user <user@spark.apache.org> > *Subject: *Broadcast destroy > > > > All, > > > > If we are updating broadcast variables do we need to manually destroy the > replaced broadcast, or will they be automatically pruned? > > > > Thank you, > > > > Bryan Jeffrey > > > > Sent from my Windows 10 phone > > > > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Ingesting data in elasticsearch from hdfs using spark , cluster setup and usage
Hi Rohit, Since your instances have 16G dual core only, I would suggest to use dedicated nodes for elastic using 8GB for elastic heap memory. This way you won't have any interference between spark executors and elastic. Also, if possible, you could try to use SSD disk on these 3 machines for storing the elastic indices; this will boost your elastic cluster performance. Best, Anastasios On Thu, Dec 22, 2016 at 6:35 PM, Rohit Verma <rohit.ve...@rokittech.com> wrote: > I am setting up a spark cluster. I have hdfs data nodes and spark master > nodes on same instances. To add elasticsearch to this cluster, should I > spawn es on different machine on same machine. I have only 12 machines, > 1-master (spark and hdfs) > 8-spark workers and hdfs data nodes > I can use 3 nodes for es dedicatedly or can use 11 nodes running all three. > > All instances are same, 16gig dual core (unfortunately). > > Also I am trying with es hadoop, es-spark project but I felt ingestion is > very slow if I do 3 dedicated nodes, its like 0.6 million records/minute. > If any one had experience using that project can you please share your > thoughts about tuning. > > Regards > Rohit > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading multiple partitions in parallel.
Hi there, spark.read.json usually takes a filesystem path (usually HDFS) where there is a file containing JSON per new line. See also http://spark.apache.org/docs/latest/sql-programming-guide.html Hence, in your case val df4 = spark.read.json(rdd) // This line takes forever seems wrong. I guess you might want to first store rdd as a text file on HDFS and then read it using spark.read.json . Cheers, Anastasios On Sat, Nov 26, 2016 at 9:34 AM, kant kodali <kanth...@gmail.com> wrote: > up vote > down votefavorite > <http://stackoverflow.com/questions/40797231/apache-spark-or-spark-cassandra-connector-doesnt-look-like-it-is-reading-multipl?noredirect=1#> > > Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading > multiple partitions in parallel. > > Here is my code using spark-shell > > import org.apache.spark.sql._ > import org.apache.spark.sql.types.StringType > spark.sql("""CREATE TEMPORARY VIEW hello USING org.apache.spark.sql.cassandra > OPTIONS (table "hello", keyspace "db", cluster "Test Cluster", pushdown > "true")""") > val df = spark.sql("SELECT test from hello") > val df2 = df.select(df("test").cast(StringType).as("test")) > val rdd = df2.rdd.map { case Row(j: String) => j } > val df4 = spark.read.json(rdd) // This line takes forever > > I have about 700 million rows each row is about 1KB and this line > > val df4 = spark.read.json(rdd) takes forever as I get the following > output after 1hr 30 mins > > Stage 1:==> (4866 + 2) / 25256] > > so at this rate it will probably take days. > > I measured the network throughput rate of spark worker nodes using iftop > and it is about 2.2KB/s (kilobytes per second) which is too low so that > tells me it not reading partitions in parallel or at very least it is not > reading good chunk of data else it would be in MB/s. Any ideas on how to > fix it? > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?
How fast is Cassandra without Spark on the count operation? cqsh> SELECT COUNT(*) FROM hello (this is not equivalent with what you are doing but might help you find the root of the cause) On Thu, Nov 24, 2016 at 9:03 AM, kant kodali <kanth...@gmail.com> wrote: > I have the following code > > I invoke spark-shell as follows > > ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134 > --executor-memory 15G --executor-cores 12 --conf > spark.cassandra.input.split.size_in_mb=67108864 > > code > > scala> val df = spark.sql("SELECT test from hello") // Billion rows in > hello and test column is 1KB > > df: org.apache.spark.sql.DataFrame = [test: binary] > > scala> df.count > > [Stage 0:> (0 + 2) / 13] // I dont know what these numbers mean > precisely. > > If I invoke spark-shell as follows > > ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134 > > code > > > val df = spark.sql("SELECT test from hello") // This has about billion > rows > > scala> df.count > > > [Stage 0:=> (686 + 2) / 24686] // What are these numbers precisely? > > > Both of these versions didn't work Spark keeps running forever and I have > been waiting for more than 15 mins and no response. Any ideas on what could > be wrong and how to fix this? > > I am using Spark 2.0.2 > and spark-cassandra-connector_2.11-2.0.0-M3.jar > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: Broadcast big dataset
Hey, Is the driver running OOM? Try 8g on the driver memory. Speaking of which, how do you estimate that your broadcasted dataset is 500M? Best, Anastasios Am 29.09.2016 5:32 AM schrieb "WangJianfei": > First thank you very much! > My executor memeory is also 4G, but my spark version is 1.5. Does spark > version make a trouble? > > > > > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/Broadcast-big- > dataset-tp19127p19143.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >
Re: Large-scale matrix inverse in Spark
Hi there, As Edward noted, if you ask a numerical analyst about matrix inversion, they will respond "you never invert a matrix, but you solve the linear system associated with the matrix". Linear system solving is usually done with iterative methods or matrix decompositions (as noted above). The reason why people avoid matrix inversion is because of its inherited poor numerical stability. Best, Anastasios On Tue, Sep 27, 2016 at 8:42 AM, Edward Fine <edward.f...@gmail.com> wrote: > I have not found matrix inversion algorithms in Spark and I would be > surprised to see them. Except for matrices with very special structure > (like those nearly the identity), inverting and n*n matrix is slower than > O(n^2), which does not scale. Whenever a matrix is inverted, usually a > decomposition or a low rank approximation is used, just as Sean pointed > out. See further https://en.wikipedia.org/wiki/Computational_ > complexity_of_mathematical_operations#Matrix_algebra > or if you really want to dig into it > Stoer and Bulirsch http://www.springer.com/us/book/9780387954523 > > On Mon, Sep 26, 2016 at 11:00 PM Sean Owen <so...@cloudera.com> wrote: > >> I don't recall any code in Spark that computes a matrix inverse. There is >> code that solves linear systems Ax = b with a decomposition. For example >> from looking at the code recently, I think the regression implementation >> actually solves AtAx = Atb using a Cholesky decomposition. But, A = n x k, >> where n is large but k is smallish (number of features), so AtA is k x k >> and can be solved in-memory with a library. >> >> On Tue, Sep 27, 2016 at 3:05 AM, Cooper <ahmad.raban...@gmail.com> wrote: >> > How is the problem of large-scale matrix inversion approached in Apache >> Spark >> > ? >> > >> > This linear algebra operation is obviously the very base of a lot of >> other >> > algorithms (regression, classification, etc). However, I have not been >> able >> > to find a Spark API on parallel implementation of matrix inversion. Can >> you >> > please clarify approaching this operation on the Spark internals ? >> > >> > Here <http://ieeexplore.ieee.org/abstract/document/7562171/> is a >> paper on >> > the parallelized matrix inversion in Spark, however I am trying to use >> an >> > existing code instead of implementing one from scratch, if available. >> > >> > >> > >> > -- >> > View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Large-scale-matrix-inverse-in-Spark-tp27796.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > - >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >> >> -- -- Anastasios Zouzias <a...@zurich.ibm.com>
Re: databricks spark-csv: linking coordinates are what?
Hi Dan, If you use spark <= 1.6, you can also do $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0 to quickly link the spark-csv jars to spark shell. Otherwise as Holden suggested you link it in your maven/sbt dependencies. Spark guys assume that their users have a good working knowledge on maven/sbt; you might need to read on these before jumping to Spark. Best, Anastasios On Fri, Sep 23, 2016 at 10:26 PM, Dan Bikle <bikle...@gmail.com> wrote: > > > > hello world-of-spark, > > I am learning spark today. > > I want to understand the spark code in this repo: > > https://github.com/databricks/spark-csv <https://github.com/databricks/spark-csv> > > In the README.md I see this info: > > Linking > > You can link against this library in your program at the following coordinates: > Scala 2.10 > > groupId: com.databricks > artifactId: spark-csv_2.10 > version: 1.5.0 > > Scala 2.11 > > groupId: com.databricks > artifactId: spark-csv_2.11 > version: 1.5.0 > > I want to know how I can use the above info. > > The people who wrote spark-csv should give some kind of example, demo, or context. > > My understanding of Linking is limited. > > I have some experience operating sbt which I learned from this URL: > > http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications <http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications> > > The above URL does not give me enough information so that I can link spark-csv with spark. > > Question: > How do I learn how to use the info in the Linking section of the README.md of > https://github.com/databricks/spark-csv <https://github.com/databricks/spark-csv> > ?? > -- -- Anastasios Zouzias