Re: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files?

2021-03-09 Thread Pankaj Bhootra
Hi,

Could someone please revert on this?


Thanks
Pankaj Bhootra


On Sun, 7 Mar 2021, 01:22 Pankaj Bhootra,  wrote:

> Hello Team
>
> I am new to Spark and this question may be a possible duplicate of the
> issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347
>
> We have a large dataset partitioned by calendar date, and within each date
> partition, we are storing the data as *parquet* files in 128 parts.
>
> We are trying to run aggregation on this dataset for 366 dates at a time
> with Spark SQL on spark version 2.3.0, hence our Spark job is reading
> 366*128=46848 partitions, all of which are parquet files. There is
> currently no *_metadata* or *_common_metadata* file(s) available for this
> dataset.
>
> The problem we are facing is that when we try to run *spark.read.parquet* on
> the above 46848 partitions, our data reads are extremely slow. It takes a
> long time to run even a simple map task (no shuffling) without any
> aggregation or group by.
>
> I read through the above issue and I think I perhaps generally understand
> the ideas around *_common_metadata* file. But the above issue was raised
> for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation
> related to this metadata file so far.
>
> I would like to clarify:
>
>1. What's the latest, best practice for reading large number of
>parquet files efficiently?
>2. Does this involve using any additional options with
>spark.read.parquet? How would that work?
>3. Are there other possible reasons for slow data reads apart from
>reading metadata for every part? We are basically trying to migrate our
>existing spark pipeline from using csv files to parquet, but from my
>hands-on so far, it seems that parquet's read time is slower than csv? This
>seems contradictory to popular opinion that parquet performs better in
>terms of both computation and storage?
>
>
> Thanks
> Pankaj Bhootra
>
>
>
> -- Forwarded message -
> From: Takeshi Yamamuro (Jira) 
> Date: Sat, 6 Mar 2021, 20:02
> Subject: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark
> Extremely Slow for Large Number of Files?
> To: 
>
>
>
> [
> https://issues.apache.org/jira/browse/SPARK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296528#comment-17296528
> ]
>
> Takeshi Yamamuro commented on SPARK-34648:
> --
>
> Please use the mailing list (user@spark.apache.org) instead. This is not
> a right place to ask questions.
>
> > Reading Parquet Files in Spark Extremely Slow for Large Number of Files?
> > 
> >
> > Key: SPARK-34648
> >     URL: https://issues.apache.org/jira/browse/SPARK-34648
> > Project: Spark
> >  Issue Type: Question
> >  Components: SQL
> >Affects Versions: 2.3.0
> >Reporter: Pankaj Bhootra
> >Priority: Major
> >
> > Hello Team
> > I am new to Spark and this question may be a possible duplicate of the
> issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347
> > We have a large dataset partitioned by calendar date, and within each
> date partition, we are storing the data as *parquet* files in 128 parts.
> > We are trying to run aggregation on this dataset for 366 dates at a time
> with Spark SQL on spark version 2.3.0, hence our Spark job is reading
> 366*128=46848 partitions, all of which are parquet files. There is
> currently no *_metadata* or *_common_metadata* file(s) available for this
> dataset.
> > The problem we are facing is that when we try to run
> *spark.read.parquet* on the above 46848 partitions, our data reads are
> extremely slow. It takes a long time to run even a simple map task (no
> shuffling) without any aggregation or group by.
> > I read through the above issue and I think I perhaps generally
> understand the ideas around *_common_metadata* file. But the above issue
> was raised for Spark 1.3.1 and for Spark 2.3.0, I have not found any
> documentation related to this metadata file so far.
> > I would like to clarify:
> >  # What's the latest, best practice for reading large number of parquet
> files efficiently?
> >  # Does this involve using any additional options with
> spark.read.parquet? How would that work?
> >  # Are there other possible reasons for slow data reads apart from
> reading metadata for every part? We are basically trying to migrate our
> existing spark pipeline from using csv files to parquet, but from my
> hands-on so far, it seems that parquet's read time is slower than csv? This
> seems contradictory to popular opinion that parquet performs better in
> terms of both computation and storage?
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.3.4#803005)
>


Fwd: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files?

2021-03-06 Thread Pankaj Bhootra
Hello Team

I am new to Spark and this question may be a possible duplicate of the
issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347

We have a large dataset partitioned by calendar date, and within each date
partition, we are storing the data as *parquet* files in 128 parts.

We are trying to run aggregation on this dataset for 366 dates at a time
with Spark SQL on spark version 2.3.0, hence our Spark job is reading
366*128=46848 partitions, all of which are parquet files. There is
currently no *_metadata* or *_common_metadata* file(s) available for this
dataset.

The problem we are facing is that when we try to run *spark.read.parquet* on
the above 46848 partitions, our data reads are extremely slow. It takes a
long time to run even a simple map task (no shuffling) without any
aggregation or group by.

I read through the above issue and I think I perhaps generally understand
the ideas around *_common_metadata* file. But the above issue was raised
for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation
related to this metadata file so far.

I would like to clarify:

   1. What's the latest, best practice for reading large number of parquet
   files efficiently?
   2. Does this involve using any additional options with
   spark.read.parquet? How would that work?
   3. Are there other possible reasons for slow data reads apart from
   reading metadata for every part? We are basically trying to migrate our
   existing spark pipeline from using csv files to parquet, but from my
   hands-on so far, it seems that parquet's read time is slower than csv? This
   seems contradictory to popular opinion that parquet performs better in
   terms of both computation and storage?


Thanks
Pankaj Bhootra



-- Forwarded message -
From: Takeshi Yamamuro (Jira) 
Date: Sat, 6 Mar 2021, 20:02
Subject: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark
Extremely Slow for Large Number of Files?
To: 



[
https://issues.apache.org/jira/browse/SPARK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296528#comment-17296528
]

Takeshi Yamamuro commented on SPARK-34648:
--

Please use the mailing list (user@spark.apache.org) instead. This is not a
right place to ask questions.

> Reading Parquet Files in Spark Extremely Slow for Large Number of Files?
> 
>
> Key: SPARK-34648
> URL: https://issues.apache.org/jira/browse/SPARK-34648
> Project: Spark
>  Issue Type: Question
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Pankaj Bhootra
>Priority: Major
>
> Hello Team
> I am new to Spark and this question may be a possible duplicate of the
issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347
> We have a large dataset partitioned by calendar date, and within each
date partition, we are storing the data as *parquet* files in 128 parts.
> We are trying to run aggregation on this dataset for 366 dates at a time
with Spark SQL on spark version 2.3.0, hence our Spark job is reading
366*128=46848 partitions, all of which are parquet files. There is
currently no *_metadata* or *_common_metadata* file(s) available for this
dataset.
> The problem we are facing is that when we try to run
*spark.read.parquet* on the above 46848 partitions, our data reads are
extremely slow. It takes a long time to run even a simple map task (no
shuffling) without any aggregation or group by.
> I read through the above issue and I think I perhaps generally understand
the ideas around *_common_metadata* file. But the above issue was raised
for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation
related to this metadata file so far.
> I would like to clarify:
>  # What's the latest, best practice for reading large number of parquet
files efficiently?
>  # Does this involve using any additional options with
spark.read.parquet? How would that work?
>  # Are there other possible reasons for slow data reads apart from
reading metadata for every part? We are basically trying to migrate our
existing spark pipeline from using csv files to parquet, but from my
hands-on so far, it seems that parquet's read time is slower than csv? This
seems contradictory to popular opinion that parquet performs better in
terms of both computation and storage?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Structured Streaming to Kafka Topic

2019-03-06 Thread Pankaj Wahane
Hi,

I am using structured streaming for ETL.


val data_stream = spark
  .readStream // constantly expanding dataframe
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "sms_history")
  .option("startingOffsets", "earliest") // begin from start of topic
  .option("failOnDataLoss", "false")
  .load()

I transform this into a DataSet with following schema.

root
 |-- accountId: long (nullable = true)
 |-- countryId: long (nullable = true)
 |-- credits: double (nullable = true)
 |-- deliveryStatus: string (nullable = true)
 |-- senderId: string (nullable = true)
 |-- sentStatus: string (nullable = true)
 |-- source: integer (nullable = true)
 |-- createdOn: timestamp (nullable = true)
 |-- send_success_credits: double (nullable = true)
 |-- send_error_credits: double (nullable = true)
 |-- delivered_credits: double (nullable = true)
 |-- invalid_sd_credits: double (nullable = true)
 |-- undelivered_credits: double (nullable = true)
 |-- unknown_credits: double (nullable = true)


Now I want to write this transformed stream to another Kafka topic. I have 
temporarily used a UDF that accepts all these columns as parameters and create 
a json string for adding a column "value" for writing to Kafka.

Is there easier and cleaner way to do the same?


Thanks,
Pankaj



Re: [E] How to do stop streaming before the application got killed

2017-12-22 Thread Rastogi, Pankaj
You can add a shutdown hook to your JVM and request spark streaming context
to stop gracefully.

  /**
   * Shutdown hook to shutdown JVM gracefully
   * @param ssCtx
   */
  def addShutdownHook(ssCtx: StreamingContext) = {

Runtime.getRuntime.addShutdownHook( new Thread() {

  override def run() = {

println("In shutdown hook")
// stop gracefully
ssCtx.stop(true, true)
  }
})
  }
}

Pankaj

On Fri, Dec 22, 2017 at 9:56 AM, Toy <noppani...@gmail.com> wrote:

> I'm trying to write a deployment job for Spark application. Basically the
> job will send yarn application --kill app_id to the cluster but after the
> application received the signal it dies without finishing whatever is
> processing or stopping the stream.
>
> I'm using Spark Streaming. What's the best way to stop Spark application
> so we won't lose any data.
>
>
>


Re: [E] Re: Spark Job is stuck at SUBMITTED when set Driver Memory > Executor Memory

2017-06-12 Thread Rastogi, Pankaj
Please make sure that you have enough memory available on the driver node. If 
there is not enough free memory on the driver node, then your application won't 
start.

Pankaj

From: vaquar khan <vaquar.k...@gmail.com<mailto:vaquar.k...@gmail.com>>
Date: Saturday, June 10, 2017 at 5:02 PM
To: Abdulfattah Safa <fattah.s...@gmail.com<mailto:fattah.s...@gmail.com>>
Cc: User <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: [E] Re: Spark Job is stuck at SUBMITTED when set Driver Memory > 
Executor Memory

You can add memory in your command make sure given memory available on your 
executor


./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master 
spark://207.184.161.138:7077<https://urldefense.proofpoint.com/v2/url?u=http-3A__207.184.161.138-3A7077=DwMFaQ=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxpb6__0PomBTQ=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA=wxxfRxzLq-84-0MgK0lf3k9fISTBemTByQfiA5jv7zQ=vnOyOle4HerCDAASfIwUj29e-H2eVhtSuknGDC9mHyI=>
 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000


https://spark.apache.org/docs/1.1.0/submitting-applications.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_1.1.0_submitting-2Dapplications.html=DwMFaQ=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxpb6__0PomBTQ=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA=wxxfRxzLq-84-0MgK0lf3k9fISTBemTByQfiA5jv7zQ=RPQU9484Nv1qoYOjnB_R_w5pjZga5v3YaA5UMTxEXA0=>

Also try to avoid function need memory like collect etc.


Regards,
Vaquar khan


On Jun 4, 2017 5:46 AM, "Abdulfattah Safa" 
<fattah.s...@gmail.com<mailto:fattah.s...@gmail.com>> wrote:
I'm working on Spark with Standalone Cluster mode. I need to increase the 
Driver Memory as I got OOM in t he driver thread. If found that when setting  
the Driver Memory to > Executor Memory, the submitted job is stuck at Submitted 
in the driver and the application never starts.



SPARK-19547

2017-06-07 Thread Rastogi, Pankaj
)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I see that there is Spark ticket opened with the same 
issue(https://issues.apache.org/jira/browse/SPARK-19547) but it has been marked 
as INVALID. Can someone explain why this ticket is marked INVALID.

Thanks,
Pankaj


Re: how to add colum to dataframe

2016-12-06 Thread Pankaj Wahane
You may want to try using df2.na.fill(…)

From: lk_spark 
Date: Tuesday, 6 December 2016 at 3:05 PM
To: "user.spark" 
Subject: how to add colum to dataframe

hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|

Why what I got is null?

2016-12-06

lk_spark


Execution error during ALS execution in spark

2016-03-31 Thread Pankaj Rawat
Hi,

While building Recommendation engine using spark MLlib (ALS) we are facing some 
issues during execution.

Details are below.

We are trying to train our model on 1.4 million sparse rating records (1,00, 
000 customer X 50,000 items). The execution DAG cycle is taking a long time and 
is crashing after several hours when executing 
model.recommendProductsForUsers() step . The causes of exception are 
non-uniform and varied from time to time.

The common exceptions faced during last 10 runs are

a)  Akka Timeout

b)  Out of Memory Exceptions

c)   Executor disassociation.

We have tried increasing execution time to 1200 seconds, that doesn't seem to 
create an impact
   sparkConf.set("spark.network.timeout", "1200s");
   sparkConf.set("spark.rpc.askTimeout", "1200s");
   sparkConf.set("spark.rpc.lookupTimeout", "1200s");
   sparkConf.set("spark.akka.timeout", "1200s");

Our command line parameters are as follows --num-executors 5 
--executor-memory 2G --conf spark.yarn.executor.memoryOverhead=600 --conf 
spark.default.parallelism=500 --master yarn

Configuration

1.   3 node cluster,  16 GB RAM, Intel I7 processor.

2.   Spark 1.5.2

The algorithm is perfectly working for lesser number of records.

We would appreciate any help in this regard and would like to know following

1.   How can we handle execution of large records in spark without fail, as 
the rating records will increase with time.

2.   Are we missing any command line parameters that are necessary for this 
type of heavy execution.

3.   Does above cluster size and configuration adequate for this many 
record processing?  Large amount of time taken during execution is fine, but 
the process should not Fail.

4.   What is exactly meant by Akka timeout error during ALS job execution ?

Regards,
Pankaj Rawat


Re: Spark Streaming: java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream

2016-03-11 Thread Pankaj Wahane
Next thing you may want to check is if the jar has been provided to all the 
executors in your cluster. Most of the class not found errors got resolved for 
me after making required jars available in the SparkContext.

Thanks.

From: Ted Yu >
Date: Saturday, 12 March 2016 at 7:17 AM
To: Siva >
Cc: spark users >
Subject: Re: Spark Streaming: java.lang.NoClassDefFoundError: 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream

KafkaLZ4BlockOutputStream is in kafka-clients jar :

$ jar tvf kafka-clients-0.8.2.0.jar | grep KafkaLZ4BlockOutputStream
  1609 Wed Jan 28 22:30:36 PST 2015 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream$BD.class
  2918 Wed Jan 28 22:30:36 PST 2015 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream$FLG.class
  4578 Wed Jan 28 22:30:36 PST 2015 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.class

Can you check whether kafka-clients jar was in the classpath of the container ?

Thanks

On Fri, Mar 11, 2016 at 5:00 PM, Siva 
> wrote:
Hi Everyone,

All of sudden we are encountering the below error from one of the spark 
consumer. It used to work before without any issues.

When I restart the consumer with latest offsets, it is working fine for 
sometime (it executed few batches) and it fails again, this issue is 
intermittent.

Did any one come across this issue?

16/03/11 19:44:50 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 
(TID 3, ip-172-31-32-183.us-west-2.compute.internal): 
java.lang.NoClassDefFoundError: 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream
at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:65)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:615)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:160)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.message.KafkaLZ4BlockOutputStream
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 23 more


Container id: container_1456361466298_0236_01_02
Exit code: 50
Stack trace: ExitCodeException exitCode=50:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit 

seriazable error in apache spark job

2015-12-17 Thread Pankaj Narang
I am encountering below error. Can somebody guide ?

Something similar is one this link
https://github.com/elastic/elasticsearch-hadoop/issues/298


actor.MentionCrawlActor
java.io.NotSerializableException: actor.MentionCrawlActor
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_79]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/seriazable-error-in-apache-spark-job-tp25732.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question on take function - Spark Java API

2015-08-26 Thread Pankaj Wahane
Thanks Sonal.. I shall try doing that..

 On 26-Aug-2015, at 1:05 pm, Sonal Goyal sonalgoy...@gmail.com wrote:
 
 You can try using wholeTextFile which will give you a pair rdd of fileName, 
 content. flatMap through this and manipulate the content. 
 
 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/ 
 Check out Reifier at Spark Summit 2015 
 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/
 
  http://in.linkedin.com/in/sonalgoyal
 
 
 
 On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane pankaj.wah...@qiotec.com 
 mailto:pankaj.wah...@qiotec.com wrote:
 Hi community members,
 
 
 Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
 
 Question:
 
 I have multiple files in a folder and and the first line in each file is 
 name of the asset that the file belongs to. Second line is csv header row 
 and data starts from third row..
 
 Ex: File 1
 
 TestAsset01
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,123,456,789
 11-01-2015 15:00:01,123,456,789
 . . .
 
 Ex: File 2
 
 TestAsset02
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,1230,4560,7890
 11-01-2015 15:00:01,1230,4560,7890
 . . .
 
 I have got nearly 1000 files in each folder sizing ~10G
 
 I am using apache spark Java api to read all this files.
 
 Following is code extract that I am using:
 
 try (JavaSparkContext sc = new JavaSparkContext(conf)) {
 MapString, String readingTypeMap = getReadingTypesMap(sc);
 //Read File
 JavaRDDString data = 
 sc.textFile(resourceBundle.getString(FOLDER_NAME));
 //Get Asset
 String asset = data.take(1).get(0);
 //Extract Time Series Data
 JavaRDDString actualData = data.filter(line - 
 line.contains(DELIMERTER));
 //Strip header
 String header = actualData.take(1).get(0);
 String[] headers = header.split(DELIMERTER);
 //Extract actual data
 JavaRDDString timeSeriesLines = actualData.filter(line - 
 !line.equals(header));
 //Extract valid records
 JavaRDDString validated = timeSeriesLines.filter(line - 
 validate(line));
 //Find Granularity
 Integer granularity = 
 toInt(resourceBundle.getString(GRANULARITY));
 //Transform to TSD objects
 JavaRDDTimeSeriesData tsdFlatMap = 
 transformTotimeSeries(validated, asset, readingTypeMap, headers, 
 granularity);
 
 //Save to Cassandra
 
 javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace),
 time_series_data, 
 mapToRow(TimeSeriesData.class)).saveToCassandra();
 
 System.out.println(Total Records:  + timeSeriesLines.count());
 System.out.println(Valid Records:  + validated.count());
 }
 Within TimeSeriesData Object I need to set the asset name for the reading, 
 so I need output of data.take(1) to be different for different files.
 
 
 Thank You.
 
 Best Regards,
 Pankaj
 
 
 
 
 QIO Technologies Limited is a limited company registered in England  Wales 
 at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 
 
 This message and the information contained within it is intended solely for 
 the addressee and may contain confidential or privileged information. If you 
 have received this message in error please notify QIO Technologies Limited 
 immediately and then permanently delete this message. If you are not the 
 intended addressee then you must not copy, transmit, disclose or rely on the 
 information contained in this message or in any attachment to it, all such 
 use is prohibited to maximum extent possible by law.
 
 


-- 


QIO Technologies Limited is a limited company registered in England  Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.


Question on take function - Spark Java API

2015-08-25 Thread Pankaj Wahane
Hi community members,


 Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
 
 Question:
 
 I have multiple files in a folder and and the first line in each file is name 
 of the asset that the file belongs to. Second line is csv header row and data 
 starts from third row..
 
 Ex: File 1
 
 TestAsset01
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,123,456,789
 11-01-2015 15:00:01,123,456,789
 . . .
 
 Ex: File 2
 
 TestAsset02
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,1230,4560,7890
 11-01-2015 15:00:01,1230,4560,7890
 . . .
 
 I have got nearly 1000 files in each folder sizing ~10G
 
 I am using apache spark Java api to read all this files.
 
 Following is code extract that I am using:
 
 try (JavaSparkContext sc = new JavaSparkContext(conf)) {
 MapString, String readingTypeMap = getReadingTypesMap(sc);
 //Read File
 JavaRDDString data = 
 sc.textFile(resourceBundle.getString(FOLDER_NAME));
 //Get Asset
 String asset = data.take(1).get(0);
 //Extract Time Series Data
 JavaRDDString actualData = data.filter(line - 
 line.contains(DELIMERTER));
 //Strip header
 String header = actualData.take(1).get(0);
 String[] headers = header.split(DELIMERTER);
 //Extract actual data
 JavaRDDString timeSeriesLines = actualData.filter(line - 
 !line.equals(header));
 //Extract valid records
 JavaRDDString validated = timeSeriesLines.filter(line - 
 validate(line));
 //Find Granularity
 Integer granularity = 
 toInt(resourceBundle.getString(GRANULARITY));
 //Transform to TSD objects
 JavaRDDTimeSeriesData tsdFlatMap = 
 transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);
 
 //Save to Cassandra
 
 javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace),
 time_series_data, 
 mapToRow(TimeSeriesData.class)).saveToCassandra();
 
 System.out.println(Total Records:  + timeSeriesLines.count());
 System.out.println(Valid Records:  + validated.count());
 }
 Within TimeSeriesData Object I need to set the asset name for the reading, so 
 I need output of data.take(1) to be different for different files.
 
 
 Thank You.
 
 Best Regards,
 Pankaj
 
 


-- 


QIO Technologies Limited is a limited company registered in England  Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.


Spark Streaming Restart at scheduled intervals

2015-08-10 Thread Pankaj Narang
Hi All,

I am creating spark twitter streaming connection in my app over long period
of time. When I have some new keywords I need to add them to the spark
streaming connection. I need to stop and start the current twitter streaming
connection in this case.

I have tried akka actor scheduling but could not achieve the same.

Have anybody have idea how to do that ?

Regards
Pankaj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Restart-at-scheduled-intervals-tp24192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Out of memory with twitter spark streaming

2015-08-06 Thread Pankaj Narang
Hi 

I am running one application using activator where I am retrieving tweets
and storing them to mysql database using below code. 

I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
OOM get delayed only.

Can anybody give me clue. Here is the code

 var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
var tweets = tweetStream.map(tweet = { 
  var user = tweet.getUser
  var replyStatusId = tweet.getInReplyToStatusId
  var reTweetStatus = tweet.getRetweetedStatus
  var pTweetId = -1L
  var pcreatedAt = 0L
  if(reTweetStatus != null){
pTweetId = reTweetStatus.getId
pcreatedAt = reTweetStatus.getCreatedAt.getTime
  }  
  tweet.getCreatedAt.getTime + |$ + tweet.getId +
|$+user.getId + |$ + user.getName+ |$ + user.getScreenName + |$ +
user.getDescription +
  |$ + tweet.getText.trim + |$ + user.getFollowersCount +
|$ + user.getFriendsCount + |$ + tweet.getGeoLocation + |$ +
  user.getLocation + |$ + user.getBiggerProfileImageURL + |$
+ replyStatusId + |$ + pTweetId + |$ + pcreatedAt
} )
  tweets.foreachRDD(tweetsRDD = {tweetsRDD.distinct()
 val count = tweetsRDD.count
 println(* +%s tweets found on
this RDD.format(count))
 if (count   0){
var timeMs = System.currentTimeMillis
var counter =
DBQuery.getProcessedCount()
   var location=tweets/+ counter +/ 
tweetsRDD.collect().map(tweet= 
DBQuery.saveTweets(tweet)) 
//tweetsRDD.saveAsTextFile(location+
timeMs)+ .txt
DBQuery.addTweetRDD(counter) 
}
})
  
   // Checkpoint directory to recover from failures
   println(tweets for the last stream are saved which can be processed
later)
   val= f:/svn1/checkpoint/
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()


regards
Pankaj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



AvroFiles

2015-05-05 Thread Pankaj Deshpande
Hi I am using Spark 1.3.1 to read an avro file stored on HDFS. The avro
file was created using Avro 1.7.7. Similar to the example mentioned in
http://www.infoobjects.com/spark-with-avro/
I am getting a nullPointerException on Schema read. It could be a avro
version mismatch. Has anybody had a similar issue with avro.


Thanks


Re: AvroFiles

2015-05-05 Thread Pankaj Deshpande
I am not using kyro. I was using the regular sqlcontext.avrofiles to open.
The files loads properly with the schema. Exception happens when I try to
read it. Will try  kyro serializer and see if that helps.
On May 5, 2015 9:02 PM, Todd Nist tsind...@gmail.com wrote:

 Are you using Kryo or Java serialization? I found this post useful:


 http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist

 If using kryo, you need to register the classes with kryo, something like
 this:


 sc.registerKryoClasses(Array(
 classOf[ConfigurationProperty],
classOf[Event]
 ))

 Or create a registrator something like this:

 class ODSKryoRegistrator extends KryoRegistrator {
   override def registerClasses(kryo: Kryo) {
 kryo.register(classOf[ConfigurationProperty], new 
 AvroSerializer[ConfigurationProperty]())
 kryo.register(classOf[Event], new AvroSerializer[Event]()))
   }

 I encountered a similar error since several of the Avor core classes are
 not marked Serializable.

 HTH.

 Todd

 On Tue, May 5, 2015 at 7:09 PM, Pankaj Deshpande ppa...@gmail.com wrote:

 Hi I am using Spark 1.3.1 to read an avro file stored on HDFS. The avro
 file was created using Avro 1.7.7. Similar to the example mentioned in
 http://www.infoobjects.com/spark-with-avro/
 I am getting a nullPointerException on Schema read. It could be a avro
 version mismatch. Has anybody had a similar issue with avro.


 Thanks





Issue with deploye Driver in cluster mode

2015-02-26 Thread pankaj
Hi,

I have 3 node spark cluster

node1 , node2 and node 3

I running below command on node 1 for deploying driver

/usr/local/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class
com.fst.firststep.aggregator.FirstStepMessageProcessor --master
spark://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:7077 --deploy-mode cluster
--supervise file:///home/xyz/sparkstreaming-0.0.1-SNAPSHOT.jar
/home/xyz/config.properties

driver gets launched on node 2 in cluster. but getting exception on node 2
that it is trying to bind to node 1 ip.


2015-02-26 08:47:32 DEBUG AkkaUtils:63 - In createActorSystem, requireCookie
is: off
2015-02-26 08:47:32 INFO  Slf4jLogger:80 - Slf4jLogger started
2015-02-26 08:47:33 ERROR NettyTransport:65 - failed to bind to
ec2-xx.xx.xx.xx.compute-1.amazonaws.com/xx.xx.xx.xx:0, shutting down Netty
transport
2015-02-26 08:47:33 WARN  Utils:71 - Service 'Driver' could not bind on port
0. Attempting port 1.
2015-02-26 08:47:33 DEBUG AkkaUtils:63 - In createActorSystem, requireCookie
is: off
2015-02-26 08:47:33 ERROR Remoting:65 - Remoting error: [Startup failed] [
akka.remote.RemoteTransportException: Startup failed
at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)
at akka.remote.Remoting.start(Remoting.scala:201)
at
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
at
akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756)
at
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:33)
at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to:
ec2-xx-xx-xx.compute-1.amazonaws.com/xx.xx.xx.xx:0
at
org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at
akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
at
akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
 

kindly suggest

Thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-deploye-Driver-in-cluster-mode-tp21821.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Loading JSON dataset with Spark Mllib

2015-02-15 Thread pankaj channe
Hi,

I am new to spark and planning on writing a machine learning application
with Spark mllib. My dataset is in json format. Is it possible to load data
into spark without using any external json libraries? I have explored the
option of SparkSql but I believe that is only for interactive use or
loading data into hive tables.

Thanks,
Pankaj


Re: Spark Team - Paco Nathan said that your team can help

2015-01-22 Thread Pankaj
http://spark.apache.org/docs/latest/


Follow this. Its easy to get started. Use prebuilt version of spark as of
now :D

On Thu, Jan 22, 2015 at 5:06 PM, Sudipta Banerjee 
asudipta.baner...@gmail.com wrote:



 Hi Apache-Spark team ,

 What are the system requirements installing Hadoop and Apache Spark?
 I have attached the screen shot of Gparted.


 Thanks and regards,
 Sudipta




 --
 Sudipta Banerjee
 Consultant, Business Analytics and Cloud Based Architecture
 Call me +919019578099


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: reading a csv dynamically

2015-01-21 Thread Pankaj Narang
Yes I think you need to create one map first which will keep the number of
values in every line. Now you can group all the records with same number of
values. Now you know how many types of arrays you will have.


val dataRDD = sc.textFile(file.csv) 
val dataLengthRDD =   dataRDD .map(line=(_.split(,).length,line))
val groupedData = dataLengthRDD.groupByKey()

now you can process the groupedData as it will have arrays of length x in
one RDD.

groupByKey([numTasks])  When called on a dataset of (K, V) pairs, returns a
dataset of (K, IterableV) pairs. 


I hope this helps

Regards
Pankaj 
Infoshore Software
India




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304p21307.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Finding most occurrences in a JSON Nested Array

2015-01-21 Thread Pankaj Narang
send me the current code here. I will fix and send back to you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p21295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Finding most occurrences in a JSON Nested Array

2015-01-19 Thread Pankaj Narang
I  just checked the post. do you need help still ?

I think getAs(Seq[String]) should help.

If you are still stuck let me know. 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p21252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to compute RDD[(String, Set[String])] that include large Set

2015-01-19 Thread Pankaj Narang
Instead of counted.saveAsText(“/path/to/save/dir) if you call
counted.collect what happens ?


If you still face the same issue please paste the stacktrace here.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-that-include-large-Set-tp21248p21250.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream

2015-01-06 Thread Pankaj Narang
Good luck. Let me know If I can assist you further

Regards
-Pankaj 
Linkedin 
https://www.linkedin.com/profile/view?id=171566646
Skype 
pankaj.narang



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926p20991.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Set EXTRA_JAR environment variable for spark-jobserver

2015-01-06 Thread Pankaj Narang
I suggest to create uber jar instead.

check my thread for the same

http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-td20926.html


Regards
-Pankaj 
Linkedin 
https://www.linkedin.com/profile/view?id=171566646
Skype 
pankaj.narang



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Finding most occurrences in a JSON Nested Array

2015-01-06 Thread Pankaj Narang
Thats great. I was not having access on the developer machine so sent you the
psuedo code only.

Happy to see its working. If you need any more help related to spark let me
know anytime.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL implementation error

2015-01-06 Thread Pankaj Narang
As per telephonic call see how we can fetch the count

 val tweetsCount = sql(SELECT COUNT(*) FROM tweets)
  println(f\n\n\nThere are ${tweetsCount.collect.head.getLong(0)} Tweets on
this Dataset\n\n)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-implementation-error-tp20901p21008.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Finding most occurrences in a JSON Nested Array

2015-01-05 Thread Pankaj Narang
If you need more help let me know

-Pankaj
Linkedin 
https://www.linkedin.com/profile/view?id=171566646
Skype
pankaj.narang



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20976.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Finding most occurrences in a JSON Nested Array

2015-01-05 Thread Pankaj Narang
try as below

results.map(row = row(1)).collect

try 

var hobbies = results.flatMap(row = row(1))

It will create all the hobbies in a simpe array nowob

hbmap =hobbies.map(hobby =(hobby,1)).reduceByKey((hobcnt1,hobcnt2)
=hobcnt1+hobcnt2)

It will aggregate  hobbies as below

{swimming,2}, {hiking,1}


Now hbmap .map{case(hobby,count)=(count,hobby)}.sortByKey(ascending
=false).collect 

will give you hobbies sorted in descending by their count
 
This is pseudo code and must help you

Regards
Pankaj






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20975.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Finding most occurrences in a JSON Nested Array

2015-01-05 Thread Pankaj Narang
yes row(1).collect would be wrong as it is not tranformation on RDD try
getString(1) to fetch the RDD

I already said this is the psuedo code. If it does not help let me know I
will run the code and send you


get/getAs should  work for you for example 

   var hashTagsList =  popularHashTags.flatMap ( x =
x.getAs[Seq[String]](0))  


Even if  you want I will take the remote of your machine to fix that
Regards
Pankaj
Linkedin 
https://www.linkedin.com/profile/view?id=171566646
Skype 
pankaj.narang






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile

2015-01-03 Thread Pankaj Narang
If you can paste the code here I can certainly help.

Also confirm the version of spark you are using

Regards
Pankaj 
Infoshore Software 
India



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p20953.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NoClassDefFoundError when trying to run spark application

2015-01-02 Thread Pankaj Narang
do you assemble the uber jar ?

you can use sbt assembly to build the jar and then run. It should fix the
issue



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoClassDefFoundError-when-trying-to-run-spark-application-tp20707p20944.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



(send this email to subscribe)

2015-01-02 Thread Pankaj



Re: NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream

2015-01-02 Thread Pankaj Narang
Like before I get a java.lang.NoClassDefFoundError:
akka/stream/FlowMaterializer$

This can be solved using assembly plugin. you need to enable assembly plugin
in global plugins

C:\Users\infoshore\.sbt\0.13\plugins
 add a line in plugins.sbt  addSbtPlugin(com.eed3si9n % sbt-assembly %
0.11.0)



 and then add the following lines in build.sbt 

import AssemblyKeys._ // put this at the top of the file

seq(assemblySettings: _*)

Also in the bottom dont forget to add

assemblySettings

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith(manifest.mf)  =
MergeStrategy.discard
  case m if m.toLowerCase.matches(meta-inf.*\\.sf$)  =
MergeStrategy.discard
  case log4j.properties  =
MergeStrategy.discard
  case m if m.toLowerCase.startsWith(meta-inf/services/) =
MergeStrategy.filterDistinctLines
  case reference.conf=
MergeStrategy.concat
  case _   =
MergeStrategy.first
}


Now in your sbt run sbt assembly that will create the jar which can be run
without --jars options
as this will be a uber jar containing all jars



Also nosuchmethod exception is thrown when there is difference in versions
of complied and runtime versions.

What is the version of spark you are using ? You need to use same version in
build.sbt


Here is your build.sbt


libraryDependencies += org.apache.spark %% spark-core % 1.1.1
//exclude(com.typesafe, config) 

libraryDependencies += org.apache.spark %% spark-sql % 1.1.1 

libraryDependencies += com.datastax.cassandra % cassandra-driver-core %
2.1.3 

libraryDependencies += com.datastax.spark %% spark-cassandra-connector %
1.1.0 withSources() withJavadoc() 

libraryDependencies += org.apache.cassandra % cassandra-thrift % 2.0.5 

libraryDependencies += joda-time % joda-time % 2.6 


and your error is Exception in thread main java.lang.NoSuchMethodError:
com.typesafe.config.Config.getDuration(Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J
 
at
akka.stream.StreamSubscriptionTimeoutSettings$.apply(FlowMaterializer.scala:256)
 
 I think there is version mismatch on the jars you use at runtime


 If you need more help add me on skype pankaj.narang


---Pankaj





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926p20950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Publishing streaming results to web interface

2015-01-02 Thread Pankaj Narang
Thomus,

Spark does not provide any web interface directly. There might be third
party apps providing dashboards
but I am not aware of any for the same purpose.

*You can use some methods so that this data is saved on file system instead
of being printed on screen

Some of the methods you can use ON RDD are saveAsObjectFile, saveAsFile
*


Now you can read these files to show them on web interface in  any language
of your choice

Regards
Pankaj






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Publishing-streaming-results-to-web-interface-tp20948p20949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading nested JSON data with Spark SQL

2015-01-01 Thread Pankaj Narang
Hih

I am having simiiar problem and tries your solution with spark 1.2 build
withing hadoop

I am saving object to parquet files where some fields are of type Array.

When I fetch them as below I get 

 java.lang.ClassCastException: [B cannot be cast to java.lang.CharSequence



def fetchTags(rows: SchemaRDD) = {
   rows.flatMap ( x =
((x.getAs[Buffer[CharSequence]](0)).map(_.toString())) )
  }



The value I am fetching have been stored as Array of Strings. I have tried
replacing Buffer[CharSequence] with Array[String] Seq[String] Seq[Seq[char]]
but still got errors

Can you provide clue. 

Pankaj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-nested-JSON-data-with-Spark-SQL-tp19310p20933.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading nested JSON data with Spark SQL

2015-01-01 Thread Pankaj Narang
Also it looks like that when  I store the String in parquet and try to fetch
them using spark code I got classcast exception


below how my array of strings are saved. each character ascii value is
 present in array of ints
res25: Array[Seq[String]] r= Array(ArrayBuffer(Array(104, 116, 116, 112, 58,
47, 47, 102, 98, 46, 109, 101, 47, 51, 67, 111, 72, 108, 99, 101, 77, 103)),
ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(Array(104, 116,
116, 112, 58, 47, 47, 105, 110, 115, 116, 97, 103, 114, 97, 109, 46, 99,
111, 109, 47, 112, 47, 120, 84, 50, 51, 78, 76, 105, 85, 55, 102, 47)),
ArrayBuffer(), ArrayBuffer(Array(104, 116, 116, 112, 58, 47, 47, 105, 110,
115, 116, 97, 103, 114, 97, 109, 46, 99, 111, 109, 47, 112, 47, 120, 84, 50,
53, 72, 52, 111, 90, 95, 114, 47)), ArrayBuffer(Array(104, 116, 116, 112,
58, 47, 47, 101, 122, 101, 101, 99, 108, 97, 115, 115, 105, 102, 105, 101,
100, 97, 100, 115, 46, 99, 111, 109, 47, 47, 100, 101, 115, 99, 47, 106, 97,
105, 112, 117, 114, 47, 49, 48, 51, 54, 50, 50,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-nested-JSON-data-with-Spark-SQL-tp19310p20935.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading nested JSON data with Spark SQL

2015-01-01 Thread Pankaj Narang
oops 

  sqlContext.setConf(spark.sql.parquet.binaryAsString, true)

thois solved the issue important for everyone



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-nested-JSON-data-with-Spark-SQL-tp19310p20936.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Time based aggregation in Real time Spark Streaming

2014-12-01 Thread pankaj
Hi,

My incoming message has time stamp as one field and i have to perform
aggregation over 3 minute of time slice.

Message sample

Item ID Item Type timeStamp
1  X   1-12-2014:12:01
1  X   1-12-2014:12:02
1  X   1-12-2014:12:03
1  y   1-12-2014:12:04
1  y   1-12-2014:12:05
1  y   1-12-2014:12:06

Aggregation Result
ItemIdItemType  count   aggregationStartTimeaggrEndTime
1  X 3  1-12-2014:12:01
1-12-2014:12:03
1  y  3   1-12-2014:12:04
 1-12-2014:12:06

What is the best way to perform time based aggregation in spark.
Kindly suggest.

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Time based aggregation in Real time Spark Streaming

2014-12-01 Thread pankaj
Hi ,

suppose i keep batch size of 3 minute. in 1 batch there can be incoming
records with any time stamp.
so it is difficult to keep track of when the 3 minute interval was start and
end. i am doing output operation on worker nodes in forEachPartition not in
drivers(forEachRdd) so i cannot use any shared variable to store start/end
time bcoz shared variable like accumulator are write only in task. 

is there any solution on this.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102p20111.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming job failing after some time.

2014-11-24 Thread pankaj channe
I have figured out the problem here. Turned out that there was a problem
with my SparkConf when I was running my application with yarn in cluster
mode. I was setting my master to be local[4] inside my application, whereas
I was setting it to yarn-cluster with spark-submit. Now I have changed my
SparkConf in my application to not to hardcore master and it works.

The application was running for some time since yarn application master
attempts retry for maxNumTries and waits between each retry before it
completely fails. I was getting appropriate results from my streaming job
during this time.

Now, I can't figure out as to why it should run successfully during this
time even if it could not find SparkContext. I am sure there should be good
reason behind this behavior. Anyone has any idea on this?

Thanks,
Pankaj Channe


On Saturday, November 22, 2014, pankaj channe pankajc...@gmail.com wrote:

 Thanks Akhil for your input.

 I have already tried with 3 executors and it still results into the same
 problem. So as Sean mentioned, the problem does not seem to be related to
 that.


 On Sat, Nov 22, 2014 at 11:00 AM, Sean Owen so...@cloudera.com wrote:

 That doesn't seem to be the problem though. It processes but then stops.
 Presumably there are many executors.
 On Nov 22, 2014 9:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 For Spark streaming, you must always set *--executor-cores* to a value
 which is = 2. Or else it will not do any processing.

 Thanks
 Best Regards

 On Sat, Nov 22, 2014 at 8:39 AM, pankaj channe pankajc...@gmail.com
 wrote:

 I have seen similar posts on this issue but could not find solution.
 Apologies if this has been discussed here before.

 I am running a spark streaming job with yarn on a 5 node cluster. I am
 using following command to submit my streaming job.

 spark-submit --class class_name --master yarn-cluster --num-executors 1
 --driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar


 After running for some time, the job stops. The application log shows
 following two errors:

 14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve
 SparkContext in spite of waiting for 10, maxNumTries = 10
 Exception in thread main java.lang.NullPointerException
 at
 org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


 and later...

 Failed to list files for dir:
 /data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20
 at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673)
 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
 at
 org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686)
 at
 org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
 at
 org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181)
 at
 org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169)


 Note: I am building my jar on my local with spark dependency added in
 pom.xml and running it on cluster running spark.


 -Pankaj






Re: Spark streaming job failing after some time.

2014-11-22 Thread pankaj channe
Thanks Akhil for your input.

I have already tried with 3 executors and it still results into the same
problem. So as Sean mentioned, the problem does not seem to be related to
that.


On Sat, Nov 22, 2014 at 11:00 AM, Sean Owen so...@cloudera.com wrote:

 That doesn't seem to be the problem though. It processes but then stops.
 Presumably there are many executors.
 On Nov 22, 2014 9:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 For Spark streaming, you must always set *--executor-cores* to a value
 which is = 2. Or else it will not do any processing.

 Thanks
 Best Regards

 On Sat, Nov 22, 2014 at 8:39 AM, pankaj channe pankajc...@gmail.com
 wrote:

 I have seen similar posts on this issue but could not find solution.
 Apologies if this has been discussed here before.

 I am running a spark streaming job with yarn on a 5 node cluster. I am
 using following command to submit my streaming job.

 spark-submit --class class_name --master yarn-cluster --num-executors 1
 --driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar


 After running for some time, the job stops. The application log shows
 following two errors:

 14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve
 SparkContext in spite of waiting for 10, maxNumTries = 10
 Exception in thread main java.lang.NullPointerException
 at
 org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


 and later...

 Failed to list files for dir:
 /data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20
 at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673)
 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
 at
 org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686)
 at
 org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
 at
 org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181)
 at
 org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169)


 Note: I am building my jar on my local with spark dependency added in
 pom.xml and running it on cluster running spark.


 -Pankaj





Spark streaming job failing after some time.

2014-11-21 Thread pankaj channe
I have seen similar posts on this issue but could not find solution.
Apologies if this has been discussed here before.

I am running a spark streaming job with yarn on a 5 node cluster. I am
using following command to submit my streaming job.

spark-submit --class class_name --master yarn-cluster --num-executors 1
--driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar


After running for some time, the job stops. The application log shows
following two errors:

14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve
SparkContext in spite of waiting for 10, maxNumTries = 10
Exception in thread main java.lang.NullPointerException
at
org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


and later...

Failed to list files for dir:
/data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20
at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at
org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169)


Note: I am building my jar on my local with spark dependency added in
pom.xml and running it on cluster running spark.


-Pankaj