Re: Broadcast join data reuse

2020-06-11 Thread Ankur Srivastava
Hi Tyson, The broadcast variable should remain in-memory of the executors and reused unless you unpersist, destroy it or it goes out of context. Hope this helps. Thanks Ankur On Wed, Jun 10, 2020 at 5:28 PM wrote: > We have a case where data the is small enough to be broadcasted in joined >

Cross Join in Spark

2019-02-14 Thread Ankur Srivastava
Hello, We have a use case where we need to do a Cartesian join and for some reason we are not able to get it work with Dataset API's. We have similar use case implemented and working with RDD. We have two dataset: - one data set with 2 string columns say c1, c2. It is a small data set with ~1

Re: partitionBy causing OOM

2017-09-25 Thread Ankur Srivastava
Hi Amit, Spark keeps the partition that it is working on in memory (and does not spill to disk even if it is running OOM). Also since you are getting OOM when using partitionBy (and not when you just use flatMap), there should be one (or few) dates on which your partition size is bigger than the

Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Ankur Srivastava
Hi Christoph, I am not an expert in ML and have not used Spark KMeans but your problem seems to be an issue of local minimum vs global minimum. You should run K-means multiple times with random starting point and also try with multiple values of K (unless you are already sure). Hope this helps.

Re: Parameter in FlatMap function

2017-04-14 Thread Ankur Srivastava
You should instead broadcast your list and then use the broadcast variable in the flatmap function. Thanks Ankur On Fri, Apr 14, 2017 at 4:32 AM, Soheila S. wrote: > Hello all, > Can someone help me to solve the following fundamental problem? > > > I have a JavaRDD and as

Re: create column with map function apply to dataframe

2017-04-14 Thread Ankur Srivastava
If I understand your question you should look at withColumn of dataframe api. df.withColumn("len", len("l")) Thanks Ankur On Fri, Apr 14, 2017 at 6:07 AM, issues solution wrote: > Hi , > how you can create column inside map function > > > like that : > >

Re: Assigning a unique row ID

2017-04-07 Thread Ankur Srivastava
You can use zipWithIndex or the approach Tim suggested or even the one you are using but I believe the issue is that tableA is being materialized every time you for the new transformations. Are you caching/persisting the table A? If you do that you should not see this behavior. Thanks Ankur On

Re: reducebykey

2017-04-07 Thread Ankur Srivastava
Hi Stephen, If you use aggregate functions or reduceGroup on KeyValueGroupedDataset it behaves as reduceByKey on RDD. Only if you use flatMapGroups and mapGroups it behaves as groupByKey on RDD and if you read the API documentation it warns of using the API. Hope this helps. Thanks Ankur On

Re: org.apache.spark.SparkException: Task not serializable

2017-03-06 Thread Ankur Srivastava
The fix for this make your class Serializable. The reason being the closures you have defined in the class need to be serialized and copied over to all executor nodes. Hope this helps. Thanks Ankur On Mon, Mar 6, 2017 at 1:06 PM, Mina Aslani wrote: > Hi, > > I am trying

Re: Spark 2.0 issue with left_outer join

2017-03-04 Thread Ankur Srivastava
the error using sql api? > >> On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava <ankur.srivast...@gmail.com> >> wrote: >> Adding DEV. >> >> Or is there any other way to do subtractByKey using Dataset APIs? >> >> Thanks >> Ankur &g

Re: Spark 2.0 issue with left_outer join

2017-03-03 Thread Ankur Srivastava
Adding DEV. Or is there any other way to do subtractByKey using Dataset APIs? Thanks Ankur On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava <ankur.srivast...@gmail.com > wrote: > Hi Users, > > We are facing an issue with left_outer join using Spark Dataset api in 2.0 &g

Spark 2.0 issue with left_outer join

2017-03-01 Thread Ankur Srivastava
Hi Users, We are facing an issue with left_outer join using Spark Dataset api in 2.0 Java API. Below is the code we have Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count() .filter((FilterFunction) row -> (Long) row.getAs("count") > 75000); _logger.info("Id count with

Re: Examples in graphx

2017-01-30 Thread Ankur Srivastava
The one issue with using Neo4j is that you need to persist the whole graph on one single machine i.e you can not shard the graph. I am not sure what is the size of your graph but if it is huge one way to shard could be to use the Component Id to shard. You can generate Component Id by running

Re: Issue creating row with java.util.Map type

2017-01-27 Thread Ankur Srivastava
function. Thanks Ankur On Fri, Jan 27, 2017 at 12:15 PM, Richard Xin <richardxin...@yahoo.com> wrote: > try > Row newRow = RowFactory.create(row.getString(0), row.getString(1), > row.getMap(2)); > > > > On Friday, January 27, 2017 10:52 AM, Ankur Srivastava < >

Re: Issue creating row with java.util.Map type

2017-01-27 Thread Ankur Srivastava
+ DEV Mailing List On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava < ankur.srivast...@gmail.com> wrote: > Hi, > > I am trying to map a Dataset with rows which have a map attribute. When I > try to create a Row with the map attribute I get cast errors. I am able to >

Issue creating row with java.util.Map type

2017-01-26 Thread Ankur Srivastava
Hi, I am trying to map a Dataset with rows which have a map attribute. When I try to create a Row with the map attribute I get cast errors. I am able to reproduce the issue with the below sample code. The surprising thing is with same schema I am able to create a dataset from the List of rows. I

Re: Issue returning Map from UDAF

2017-01-26 Thread Ankur Srivastava
n line70. > So, you need to at lease return `MapType` instead of `StructType`. > The stacktrace you showed explicitly say this type unmatch. > > // maropu > > >> On Thu, Jan 26, 2017 at 12:07 PM, Ankur Srivastava >> <ankur.srivast...@gmail.com> wrote: >

Issue returning Map from UDAF

2017-01-25 Thread Ankur Srivastava
Hi, I have a dataset with tuple of ID and Timestamp. I want to do a group by on ID and then create a map with frequency per hour for the ID. Input: 1| 20160106061005 1| 20160106061515 1| 20160106064010 1| 20160106050402 1| 20160106040101 2| 20160106040101 3| 20160106051451 Expected Output:

Re: Spark GraphFrame ConnectedComponents

2017-01-09 Thread Ankur Srivastava
AM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 5 Jan 2017, at 21:10, Ankur Srivastava <ankur.srivast...@gmail.com> > wrote: > > Yes I did try it out and it choses the local file system as my checkpoint > location starts with s3n:// > > I am no

Re: Spark GraphFrame ConnectedComponents

2017-01-05 Thread Ankur Srivastava
Adding DEV mailing list to see if this is a defect with ConnectedComponent or if they can recommend any solution. Thanks Ankur On Thu, Jan 5, 2017 at 1:10 PM, Ankur Srivastava <ankur.srivast...@gmail.com > wrote: > Yes I did try it out and it choses the local file system as my c

Re: Spark GraphFrame ConnectedComponents

2017-01-05 Thread Ankur Srivastava
th delete. > > Could you by chance run just the delete to see if it fails > > FileSystem.get(sc.hadoopConfiguration) > .delete(new Path(somepath), true) > ------ > *From:* Ankur Srivastava <ankur.srivast...@gmail.com> > *Sent:* Thursday, January 5

Re: Spark GraphFrame ConnectedComponents

2017-01-05 Thread Ankur Srivastava
to read from s3 from Spark? > > _________ > From: Ankur Srivastava <ankur.srivast...@gmail.com> > Sent: Wednesday, January 4, 2017 9:23 PM > Subject: Re: Spark GraphFrame ConnectedComponents > To: Felix Cheung <felixcheun...@hotmail.com> > Cc: &

Re: Spark GraphFrame ConnectedComponents

2017-01-04 Thread Ankur Srivastava
parkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10 Thanks Ankur On Wed, Jan 4, 2017 at 8:03 PM, Ankur

Re: Spark GraphFrame ConnectedComponents

2017-01-04 Thread Ankur Srivastava
g <felixcheun...@hotmail.com> wrote: > Do you have more of the exception stack? > > > -- > *From:* Ankur Srivastava <ankur.srivast...@gmail.com> > *Sent:* Wednesday, January 4, 2017 4:40:02 PM > *To:* user@spark.apache.

Spark GraphFrame ConnectedComponents

2017-01-04 Thread Ankur Srivastava
Hi, I am trying to use the ConnectedComponent algorithm of GraphFrames but by default it needs a checkpoint directory. As I am running my spark cluster with S3 as the DFS and do not have access to HDFS file system I tried using a s3 directory as checkpoint directory but I run into below

Spark GraphFrames generic question

2016-12-16 Thread Ankur Srivastava
Hi I am working on two different use cases where the basic problem is same but scale is very different. In case 1 we have two entities that can have many to many relation and we would want to identify all subgraphs in the full graph and then further prune the graph to find the best relation.

Re: Spark Shuffle Issue

2016-10-12 Thread Ankur Srivastava
to cores? Thanks Ankur On Tue, Oct 11, 2016 at 11:16 PM, Ankur Srivastava < ankur.srivast...@gmail.com> wrote: > Hi, > > I am upgrading my jobs to Spark 1.6 and am running into shuffle issues. I > have tried all options and now am falling back to legacy memory model but > st

Spark Shuffle Issue

2016-10-12 Thread Ankur Srivastava
Hi, I am upgrading my jobs to Spark 1.6 and am running into shuffle issues. I have tried all options and now am falling back to legacy memory model but still running into same issue. I have set spark.shuffle.blockTransferService to nio. 16/10/12 06:00:10 INFO MapOutputTrackerMaster: Size of

FetchFailed exception with Spark 1.6

2016-09-29 Thread Ankur Srivastava
Hi, I am running a simple job on Spark 1.6 in which I am trying to leftOuterJoin a big RDD with a smaller one. I am not yet broadcasting the smaller RDD yet but I am stilling running into FetchFailed errors with finally the job getting killed. I have already partitioned the data to 5000

Re: OOM exception during Broadcast

2016-03-07 Thread Ankur Srivastava
Hi, We have a use case where we broadcast ~4GB of data and we are on m3.2xlarge so your object size is not an issue. Also based on your explanation does not look like a broadcast issue as it works when your partition size is small. Are you caching any other data? Because boradcast variable use

Re: Get variable into Spark's foreachRDD function

2015-09-28 Thread Ankur Srivastava
Hi, You are creating a logger instance on driver and then trying to use that instance in a transformation function which is executed on the executor. You should create logger instance in the transformation function itself but then the logs will go to separate files on each worker node. Hope

Re: Networking issues with Spark on EC2

2015-09-24 Thread Ankur Srivastava
Hi Suraj, Spark uses a lot of ports to communicate between nodes. Probably your security group is restrictive and does not allow instances to communicate on all networks. The easiest way to resolve it is to add a Rule to allow all Inbound traffic on all ports (0-65535) to instances in same

Re: How to get RDD from PairRDD<key,value> in Java

2015-09-23 Thread Ankur Srivastava
PairRdd.values is what you need. Ankur On Tue, Sep 22, 2015, 11:25 PM Zhang, Jingyu wrote: > Hi All, > > I want to extract the "value" RDD from PairRDD in Java > > Please let me know how can I get it easily. > > Thanks > > Jingyu > > > This message and its

Re: mappartition's FlatMapFunction help

2015-09-16 Thread Ankur Srivastava
Good to know it worked for you. CC'ed user group so that the thread reaches a closure. Thanks Ankur On Wed, Sep 16, 2015 at 6:13 AM, Thiago Diniz <dinizthiag...@gmail.com> wrote: > Nailed it. > > Thank you! > > 2015-09-15 14:39 GMT-03:00 Ankur Srivastava <ankur.srivas

Re: mappartition's FlatMapFunction help

2015-09-15 Thread Ankur Srivastava
Hi, The signatures are perfect. I also tried same code on eclipse and for some reason eclipse did not import java.util.Iterator. Once I imported it, it is fine. Might be same issue with NetBeans. Thanks Ankur On Tue, Sep 15, 2015 at 10:11 AM, dinizthiagobr wrote: >

Re: JavaRDD using Reflection

2015-09-14 Thread Ankur Srivastava
Hi Rachana I didn't get you r question fully but as the error says you can not perform a rdd transformation or action inside another transformation. In your example you are performing an action "rdd2.values.count()" in side the "map" transformation. It is not allowed and in any case this will be

Re: JavaRDD using Reflection

2015-09-14 Thread Ankur Srivastava
… > > } > > }); > > return featureScoreRDD; > > } > > > > } > > > > Thanks again for all your help and advice. > > > > Regards, > > > > Rach

Re: Spark partitions from CassandraRDD

2015-09-04 Thread Ankur Srivastava
ba...@pdf.com> wrote: > Thanks Ankur, > > But I grabbed some keys from the Spark results and ran "nodetool -h > getendpoints " and it showed the data is coming from at least 2 nodes? > Regards, > Alaa > > On Thu, Sep 3, 2015 at 12:06 PM, Ankur Srivastava &

Re: Spark partitions from CassandraRDD

2015-09-03 Thread Ankur Srivastava
Hi Alaa, Partition when using CassandraRDD depends on your partition key in Cassandra table. If you see only 1 partition in the RDD it means all the rows you have selected have same partition_key in C* Thanks Ankur On Thu, Sep 3, 2015 at 11:54 AM, Alaa Zubaidi (PDF)

Re: Understanding Spark Memory distribution

2015-03-30 Thread Ankur Srivastava
. To debug this, please type ps auxw | grep org.apache.spark.deploy.master.[M]aster in master machine. You can see the Xmx and Xms option. Wisely Chen On Mon, Mar 30, 2015 at 3:55 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Wisely, I am running on Amazon EC2

Re: Understanding Spark Memory distribution

2015-03-29 Thread Ankur Srivastava
2015-03-28 15:39 GMT+08:00 Ankur Srivastava ankur.srivast...@gmail.com: Hi Wisely, I have 26gb for driver and the master is running on m3.2xlarge machines. I see OOM errors on workers and even they are running with 26th of memory. Thanks On Fri, Mar 27, 2015, 11:43 PM Wisely Chen wiselyc

Understanding Spark Memory distribution

2015-03-27 Thread Ankur Srivastava
Hi All, I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have given 26gb of memory with all 8 cores to my executors. I can see that in the logs too: *15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added: app-20150327213106-/0 on

Re: Understanding Spark Memory distribution

2015-03-27 Thread Ankur Srivastava
(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) Thanks Ankur On Fri, Mar 27, 2015 at 2:52 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi All, I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have

Re: Joining data using Latitude, Longitude

2015-03-13 Thread Ankur Srivastava
of 3-4 gb? Thanks Ankur On Wed, Mar 11, 2015 at 8:58 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Thank you everyone!! I have started implementing the join using the geohash and using the first 4 alphabets of the HASH as the key. Can I assign a Confidence factor in terms

Re: Joining data using Latitude, Longitude

2015-03-11 Thread Ankur Srivastava
Thank you everyone!! I have started implementing the join using the geohash and using the first 4 alphabets of the HASH as the key. Can I assign a Confidence factor in terms of distance based on number of characters matching in the HASH code? I will also look at the other options listed here.

Joining data using Latitude, Longitude

2015-03-09 Thread Ankur Srivastava
Hi, I am trying to join data based on the latitude and longitude. I have reference data which has city information with their latitude and longitude. I have a data source with user information with their latitude and longitude. I want to find the nearest city to the user's latitude and

Re: Issue using S3 bucket from Spark 1.2.1 with hadoop 2.4

2015-03-03 Thread Ankur Srivastava
Thanks a lot Ted!! On Tue, Mar 3, 2015 at 9:53 AM, Ted Yu yuzhih...@gmail.com wrote: If you can use hadoop 2.6.0 binary, you can use s3a s3a is being polished in the upcoming 2.7.0 release: https://issues.apache.org/jira/browse/HADOOP-11571 Cheers On Tue, Mar 3, 2015 at 9:44 AM, Ankur

Issue using S3 bucket from Spark 1.2.1 with hadoop 2.4

2015-03-03 Thread Ankur Srivastava
Hi, We recently upgraded to Spark 1.2.1 - Hadoop 2.4 binary. We are not having any other dependency on hadoop jars, except for reading our source files from S3. Since we have upgraded to the latest version our reads from S3 have considerably slowed down. For some jobs we see the read from S3 is

Re: how to debug this kind of error, e.g. lost executor?

2015-02-05 Thread Ankur Srivastava
Li, I cannot tell you the reason for this exception but have seen these kind of errors when using HASH based shuffle manager (which is default) until v 1.2. Try the SORT shuffle manager. Hopefully that will help Thanks Ankur Anyone has idea on where I can find the detailed log of that lost

Re: Large # of tasks in groupby on single table

2015-02-04 Thread Ankur Srivastava
Hi Manoj, You can set the number of partitions you want your sql query to use. By default it is 200 and thus you see that number. You can update it using the spark.sql.shuffle.partitions property spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for

Re: Error when running spark in debug mode

2015-02-01 Thread Ankur Srivastava
I am running on m3.xlarge instances on AWS with 12 gb worker memory and 10 gb executor memory. On Sun, Feb 1, 2015, 12:41 PM Arush Kharbanda ar...@sigmoidanalytics.com wrote: What is the machine configuration you are running it on? On Mon, Feb 2, 2015 at 1:46 AM, Ankur Srivastava

Re: Error when running spark in debug mode

2015-01-30 Thread Ankur Srivastava
be some issue it would be in log4j and not in spark. Thanks Arush On Fri, Jan 30, 2015 at 4:24 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, When ever I enable DEBUG level logs for my spark cluster, on running a job all the executors die with the below exception. On disabling

Error when running spark in debug mode

2015-01-29 Thread Ankur Srivastava
Hi, When ever I enable DEBUG level logs for my spark cluster, on running a job all the executors die with the below exception. On disabling the DEBUG logs my jobs move to the next step. I am on spark-1.1.0 Is this a known issue with spark? Thanks Ankur 2015-01-29 22:27:42,467 [main] INFO

Re: Connecting Cassandra by unknow host

2015-01-29 Thread Ankur Srivastava
Hi, I am no expert but have a small application working with Spark and Cassandra. I faced these issues when we were deploying our cluster on EC2 instances with some machines on public network and some on private. This seems to be a similar issue as you are trying to connect to 10.34.224.249

Would Join on PairRDD's result in co-locating data by keys?

2015-01-22 Thread Ankur Srivastava
Hi, I wanted to understand how the join on two pair rdd's work? Would it result in shuffling data from both the RDD's with same key into same partition? If that is the case would it be better to use partitionBy function to partition (by the join attribute) the RDD at creation for lesser

Re: Issue writing to Cassandra from Spark

2015-01-13 Thread Ankur Srivastava
fixed the issue for us. Thanks Ankur On Mon, Jan 12, 2015 at 9:04 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Akhil, Thank you for the pointers. Below is how we are saving data to Cassandra. javaFunctions(rddToSave).writerBuilder(datapipelineKeyspace

Re: Issue writing to Cassandra from Spark

2015-01-12 Thread Ankur Srivastava
Regards On Mon, Jan 12, 2015 at 7:45 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Akhil, thank you for your response. Actually we are first reading from cassandra and then writing back after doing some processing. All the reader stages succeed with no error and many writer

Re: Issue writing to Cassandra from Spark

2015-01-11 Thread Ankur Srivastava
, Jan 10, 2015 at 8:44 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, We are currently using spark to join data in Cassandra and then write the results back into Cassandra. While reads happen with out any error during the writes we see many exceptions like below. Our environment

Issue writing to Cassandra from Spark

2015-01-09 Thread Ankur Srivastava
Hi, We are currently using spark to join data in Cassandra and then write the results back into Cassandra. While reads happen with out any error during the writes we see many exceptions like below. Our environment details are: - Spark v 1.1.0 - spark-cassandra-connector-java_2.10 v 1.1.0 We are

Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Ankur Srivastava
Hello, We are currently running our data pipeline on spark which uses Cassandra as the data source. We are currently facing issue with the step where we create an rdd on data in cassandra table and then try to run flatMapToPair to transform the data but we are running into Too many open files. I

Re: Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Ankur Srivastava
is actually being set, especially if you're on mesos (because of https://issues.apache.org/jira/browse/MESOS-123 ) Find the pid of the executor process, and cat /proc/pid/limits set spark.shuffle.consolidateFiles = true try spark.shuffle.manager = sort On Wed, Jan 7, 2015 at 3:06 PM, Ankur

Spark Cassandra connector issue

2014-10-21 Thread Ankur Srivastava
Hi, I am creating a cassandra java rdd and transforming it using the where clause. It works fine when I run it outside the mapValues, but when I put the code in mapValues I get an error while creating the transformation. Below is my sample code: CassandraJavaRDDReferenceData

Re: Spark Cassandra connector issue

2014-10-21 Thread Ankur Srivastava
Is this because I am calling a transformation function on an rdd from inside another transformation function? Is it not allowed? Thanks Ankut On Oct 21, 2014 1:59 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Gerard, this is the code that may be helpful. public class

Re: Join with large data set

2014-10-17 Thread Ankur Srivastava
partitioner based on keys so that you can avoid shuffling and optimize join performance. HTH Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Oct 17, 2014 at 4:27 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, I have a rdd

Join with large data set

2014-10-16 Thread Ankur Srivastava
Hi, I have a rdd which is my application data and is huge. I want to join this with reference data which is also huge to fit in-memory and thus I do not want to use Broadcast variable. What other options do I have to perform such joins? I am using Cassandra as my data store, so should I just

Re: Spark Standalone on EC2

2014-10-09 Thread Ankur Srivastava
are not able to access it using the hostnames then you won't be able access it with the IP address either i believe. What are you trying to do here? running your eclipse locally and connecting to your ec2 cluster? Thanks Best Regards On Tue, Oct 7, 2014 at 3:36 AM, Ankur Srivastava

Spark Standalone on EC2

2014-10-06 Thread Ankur Srivastava
Hi, I have started a Spark Cluster on EC2 using Spark Standalone cluster manager but spark is trying to identify the worker threads using the hostnames which are not accessible publicly. So when I try to submit jobs from eclipse it is failing, is there some way spark can use IP address instead

Issue with Partitioning

2014-10-01 Thread Ankur Srivastava
Hi, I am using custom partitioner to partition my JavaPairRDD where key is a String. I use hashCode of the sub-string of the key to derive the partition index but I have noticed that my partition contains keys which have a different partitionIndex returned by the partitioner. Another issue I am