Re: Spark streaming from Kafka best fit

2016-03-07 Thread pratik khadloya
Would using mapPartitions instead of map help here?

~Pratik

On Tue, Mar 1, 2016 at 10:07 AM Cody Koeninger  wrote:

> You don't need an equal number of executor cores to partitions.  An
> executor can and will work on multiple partitions within a batch, one after
> the other.  The real issue is whether you are able to keep your processing
> time under your batch time, so that delay doesn't increase.
>
> On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar 
> wrote:
>
>> Thanks Cody!
>>
>> I understand what you said and if I am correct it will be using 224
>> executor cores just for fetching + stage-1 processing of 224 partitions. I
>> will obviously need more cores for processing further stages and fetching
>> next batch.
>>
>> I will start with higher number of executor cores and see how it goes.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger 
>> wrote:
>>
>>> > "How do I keep a balance of executors which receive data from Kafka
>>> and which process data"
>>>
>>> I think you're misunderstanding how the direct stream works.  The
>>> executor which receives data is also the executor which processes data,
>>> there aren't separate receivers.  If it's a single stage worth of work
>>> (e.g. straight map / filter), the processing of a given partition is going
>>> to be done by the executor that read it from kafka.  If you do something
>>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>>> processing.  The question of which executor works on which tasks is up to
>>> the scheduler (and getPreferredLocations, which only matters if you're
>>> running spark on the same nodes as kafka)
>>>
>>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>>> jku...@rocketfuelinc.com.invalid> wrote:
>>>
 Hello all,

 I see that there are as of today 3 ways one can read from Kafka in
 spark streaming:
 1. KafkaUtils.createStream() (here
 )
 2. KafkaUtils.createDirectStream() (here
 )
 3. Kafka-spark-consumer (here
 )

 My spark streaming application has to read from 1 kafka topic with
 around 224 partitions, consuming data at around 150MB/s (~90,000
 messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
 filtering. After filtering I need to maintain top 1 URL counts. I don't
 really care about exactly once semantics as I am interested in rough
 estimate.

 Code:

 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
 sparkConf.setAppName("KafkaReader")
 val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
 createStreamingContext)

 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
 val kafkaParams = Map[String, String](
   "metadata.broker.list" -> "kafka.server.ip:9092",
   "group.id" -> consumer_group
 )

 val lineStreams = (1 to N).map{ _ =>
   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
 ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
 }

 ssc.union(
   lineStreams.map(stream => {
   stream.map(ParseStringToLogRecord)
 .filter(record => isGoodRecord(record))
 .map(record => record.url)
   })
 ).window(Seconds(120), Seconds(120))  // 2 Minute window
   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
 moving window, 28 will probably help in parallelism
   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
   .mapPartitions(iter => {
 iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
 1000).iterator
   }, true)
   .foreachRDD((latestRDD, rddTime) => {
   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
 record._1)).sortByKey(false).take(1000))
   })

 ssc.start()
 ssc.awaitTermination()

 Questions:

 a) I used #2 but I found that I couldn't control how many executors
 will be actually fetching from Kafka. How do I keep a balance of executors
 which receive data from Kafka and which process data? Do they keep changing
 for every batch?

 b) Now I am trying to use #1 creating multiple DStreams, filtering them
 and then doing a union. I don't understand why would the number of events
 processed per 120 seconds batch will change drastically. PFA the events/sec
 graph while running with 1 receiver. How to debug this?

 c) What will be the most suitable method to integrate with Kafka from
 above 3? Any recommendations for getting maximum performance, running the
 streaming application reliably in production 

Spark job stuck with 0 input records

2015-11-14 Thread pratik khadloya
Hello,

We are running spark on yarn version 1.4.1
java.vendor=Oracle Corporation
java.runtime.version=1.7.0_40-b43
datanucleus-core-3.2.10.jar
datanucleus-api-jdo-3.2.6.jar
datanucleus-rdbms-3.2.9.jar

IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDuration ▾GC
TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors1672060
RUNNINGRACK_LOCAL56 / foo1.net2015/11/14 15:28:565.1 h0.0 B (hadoop) / 00.0
B / 0

IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeInput Size / Records ▴Write TimeShuffle Write Size / RecordsErrors130176
0RUNNINGRACK_LOCAL19 / foo2.net2015/11/14 03:26:2216.8 h82 ms0.0 B (hadoop)
/ 16592040.0 B / 0


Our spark jobs have been running fine till now, suddenly we saw some lone
executors which got 0 records to process, got stuck indefinitely. We killed
some jobs which ran for 16+ hours.

This seems like a spark bug, is anyone aware of any issue in this version
of spark?


Re: Querying nested struct fields

2015-11-10 Thread pratik khadloya
I tried the same, didn't work :(

scala> hc.sql("select _1.item_id from agg_imps_df limit 10").collect()
15/11/10 14:30:41 INFO parse.ParseDriver: Parsing command: select
_1.item_id from agg_imps_df limit 10
org.apache.spark.sql.AnalysisException: missing \' at 'from' near '';
line 1 pos 23
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:289)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at
scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at
scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)

On Tue, Nov 10, 2015 at 11:25 AM Michael Armbrust <mich...@databricks.com>
wrote:

> Use a `.`:
>
> hc.sql("select _1.item_id from agg_imps_df limit 10").collect()
>
> On Tue, Nov 10, 2015 at 11:24 AM, pratik khadloya <tispra...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I just saved a PairRDD as a table, but i am not able to query it
>> correctly. The below and other variations does not seem to work.
>>
>> scala> hc.sql("select * from agg_imps_df").printSchema()
>>  |-- _1: struct (nullable = true)
>>  ||-- item_id: long (nullable = true)
>>  ||-- flight_id: long (nullable = true)
>>  |-- _2: struct (nullable = true)
>>  ||-- day_hour: string (nullable = true)
>>  ||-- imps: long (nullable = true)
>>  ||-- revenue: double (nullable = true)
>>
>>
>> scala> hc.sql("select _1:item_id from agg_imps_df limit 10").collect()
>>
>>
>> Can anyone please suggest the correct way to get the list of item_ids in
>> the query?
>>
>> Thanks,
>> ~Pratik
>>
>
>


Querying nested struct fields

2015-11-10 Thread pratik khadloya
Hello,

I just saved a PairRDD as a table, but i am not able to query it correctly.
The below and other variations does not seem to work.

scala> hc.sql("select * from agg_imps_df").printSchema()
 |-- _1: struct (nullable = true)
 ||-- item_id: long (nullable = true)
 ||-- flight_id: long (nullable = true)
 |-- _2: struct (nullable = true)
 ||-- day_hour: string (nullable = true)
 ||-- imps: long (nullable = true)
 ||-- revenue: double (nullable = true)


scala> hc.sql("select _1:item_id from agg_imps_df limit 10").collect()


Can anyone please suggest the correct way to get the list of item_ids in
the query?

Thanks,
~Pratik


Re: Querying nested struct fields

2015-11-10 Thread pratik khadloya
That worked!! Thanks a lot Michael.

~Pratik

On Tue, Nov 10, 2015 at 12:02 PM Michael Armbrust <mich...@databricks.com>
wrote:

> Oh sorry _1 is not a valid hive identifier, you need to use backticks to
> escape it:
>
> Seq(((1, 2), 2)).toDF().registerTempTable("test")
> sql("SELECT `_1`.`_1` FROM test")
>
> On Tue, Nov 10, 2015 at 11:31 AM, pratik khadloya <tispra...@gmail.com>
> wrote:
>
>> I tried the same, didn't work :(
>>
>> scala> hc.sql("select _1.item_id from agg_imps_df limit 10").collect()
>> 15/11/10 14:30:41 INFO parse.ParseDriver: Parsing command: select
>> _1.item_id from agg_imps_df limit 10
>> org.apache.spark.sql.AnalysisException: missing \' at 'from' near
>> ''; line 1 pos 23
>> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:289)
>> at
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
>> at
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>>
>> On Tue, Nov 10, 2015 at 11:25 AM Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> Use a `.`:
>>>
>>> hc.sql("select _1.item_id from agg_imps_df limit 10").collect()
>>>
>>> On Tue, Nov 10, 2015 at 11:24 AM, pratik khadloya <tispra...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I just saved a PairRDD as a table, but i am not able to query it
>>>> correctly. The below and other variations does not seem to work.
>>>>
>>>> scala> hc.sql("select * from agg_imps_df").printSchema()
>>>>  |-- _1: struct (nullable = true)
>>>>  ||-- item_id: long (nullable = true)
>>>>  ||-- flight_id: long (nullable = true)
>>>>  |-- _2: struct (nullable = true)
>>>>  ||-- day_hour: string (nullable = true)
>>>>  ||-- imps: long (nullable = true)
>>>>  ||-- revenue: double (nullable = true)
>>>>
>>>>
>>>> scala> hc.sql("select _1:item_id from agg_imps_df limit 10").collect()
>>>>
>>>>
>>>> Can anyone please suggest the correct way to get the list of item_ids
>>>> in the query?
>>>>
>>>> Thanks,
>>>> ~Pratik
>>>>
>>>
>>>
>


PairRDD from SQL

2015-11-04 Thread pratik khadloya
Hello,

Is it possible to have a pair RDD from the below SQL query.
The pair being ((item_id, flight_id), metric1)

item_id, flight_id  are part of group by.

SELECT
  item_id,
  flight_id,
  SUM(metric1) AS metric1
FROM mytable
GROUP BY
  item_id,
  flight_id


Thanks,
Pratik


Re: Huge shuffle data size

2015-10-23 Thread pratik khadloya
Sorry i sent the wrong join code snippet, the actual snippet is

ggImpsDf.join(
   aggRevenueDf,
   aggImpsDf("id_1") <=> aggRevenueDf("id_1")
 && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
 && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
 && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
   "inner")
   .select(
 aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
 aggImpsDf("day_hour_2"), aggImpsDf("metric1"), aggRevenueDf("metric2"))
   .coalesce(200)


On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya <tispra...@gmail.com>
wrote:

> Hello,
>
> Data about my spark job is below. My source data is only 916MB (stage 0)
> and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a
> very long time and as i see the shuffled data is 614GB. Is this something
> expected? Both the data sets produce 200 partitions.
>
> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
> ReadShuffle Write2saveAsTable at Driver.scala:269
> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=2=0>
> +details
>
> 2015/10/22 18:48:122.3 h
> 200/200
> 614.6 GB1saveAsTable at Driver.scala:269
> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=1=0>
> +details
>
> 2015/10/22 18:46:022.1 min
> 8/8
> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=0=0>
> +details
>
> 2015/10/22 18:46:0235 s
> 3/3
> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins the
> two data sets is:
>
> hc.sql(query).
> mapPartitions(iter => {
>   iter.map {
> case Row(
>  ...
>  ...
>  ...
> )
>   }
> }
> ).toDF()
> .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
> .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
>   sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))
>
>
> Please advise on how to reduce the shuffle and speed this up.
>
>
> ~Pratik
>
>


Re: Saprk error:- Not a valid DFS File name

2015-10-23 Thread pratik khadloya
Check what you have at SimpleMktDataFlow.scala:106

~Pratik

On Fri, Oct 23, 2015 at 11:47 AM kali.tumm...@gmail.com <
kali.tumm...@gmail.com> wrote:

> Full Error:-
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:831)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827)
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:827)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:820)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
> at
>
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:305)
> at
>
> org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
> at
> org.apache.spark.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:64)
> at
>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1046)
> at
>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:941)
> at
>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:850)
> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1164)
> at
> com.citi.ocean.spark.SimpleMktDataFlow$.main(SimpleMktDataFlow.scala:106)
> at
> com.citi.ocean.spark.SimpleMktDataFlow.main(SimpleMktDataFlow.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186p25188.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Saprk error:- Not a valid DFS File name

2015-10-23 Thread pratik khadloya
I had face a similar issue. The actual problem was not in the file name.
We run Spark on Yarn. The actual problem was seen in the logs by running
the command:
$ yarn logs -applicationId 

Scroll from the beginning to know the actual error.

~Pratik

On Fri, Oct 23, 2015 at 11:40 AM kali.tumm...@gmail.com <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> got this weird error when I tried to run spark on YARN-CLUSTER mode , I
> have
> 33 files and I am looping spark in bash one by one most of them worked ok
> except few files.
>
> Is this below error HDFS or spark error ?
>
> Exception in thread "Driver" java.lang.IllegalArgumentException: Pathname
> /user/myid/-u/12:51/_temporary/0 from
> hdfs://dev/user/myid/-u/12:51/_temporary/0 is not a valid DFS filename.
>
> File Name which I passed to spark , does file name causes issue ?
>
>
> hdfs://dev/data/20151019/sipmktdata.ColorDataArchive.UTD.P4_M-P.v5.2015-09-18.txt.20150918
>
> Thanks
> Sri
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Huge shuffle data size

2015-10-23 Thread pratik khadloya
Actually the groupBy is not taking a lot of time.
The join that i do later takes the most (95 %) amount of time.
Also, the grouping i am doing is based on the DataFrame api, which does not
contain any function for reduceBy... i guess the DF automatically uses
reduce by when we do a group by.

~Pratik

On Fri, Oct 23, 2015 at 1:38 PM Kartik Mathur <kar...@bluedata.com> wrote:

> Don't use groupBy , use reduceByKey instead , groupBy should always be
> avoided as it leads to lot of shuffle reads/writes.
>
> On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya <tispra...@gmail.com>
> wrote:
>
>> Sorry i sent the wrong join code snippet, the actual snippet is
>>
>> ggImpsDf.join(
>>aggRevenueDf,
>>aggImpsDf("id_1") <=> aggRevenueDf("id_1")
>>  && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
>>  && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
>>  && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
>>"inner")
>>.select(
>>  aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
>>  aggImpsDf("day_hour_2"), aggImpsDf("metric1"),
>> aggRevenueDf("metric2"))
>>.coalesce(200)
>>
>>
>> On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya <tispra...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Data about my spark job is below. My source data is only 916MB (stage 0)
>>> and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a
>>> very long time and as i see the shuffled data is 614GB. Is this something
>>> expected? Both the data sets produce 200 partitions.
>>>
>>> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
>>> ReadShuffle Write2saveAsTable at Driver.scala:269
>>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=2=0>
>>> +details
>>>
>>> 2015/10/22 18:48:122.3 h
>>> 200/200
>>> 614.6 GB1saveAsTable at Driver.scala:269
>>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=1=0>
>>> +details
>>>
>>> 2015/10/22 18:46:022.1 min
>>> 8/8
>>> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
>>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=0=0>
>>> +details
>>>
>>> 2015/10/22 18:46:0235 s
>>> 3/3
>>> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins
>>> the two data sets is:
>>>
>>> hc.sql(query).
>>> mapPartitions(iter => {
>>>   iter.map {
>>> case Row(
>>>  ...
>>>  ...
>>>  ...
>>> )
>>>   }
>>> }
>>> ).toDF()
>>> .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
>>> .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
>>>   sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))
>>>
>>>
>>> Please advise on how to reduce the shuffle and speed this up.
>>>
>>>
>>> ~Pratik
>>>
>>>
>


Huge shuffle data size

2015-10-23 Thread pratik khadloya
Hello,

Data about my spark job is below. My source data is only 916MB (stage 0)
and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a
very long time and as i see the shuffled data is 614GB. Is this something
expected? Both the data sets produce 200 partitions.

Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
ReadShuffle Write2saveAsTable at Driver.scala:269

+details

2015/10/22 18:48:122.3 h
200/200
614.6 GB1saveAsTable at Driver.scala:269

+details

2015/10/22 18:46:022.1 min
8/8
916.2 MB3.9 MB0saveAsTable at Driver.scala:269

+details

2015/10/22 18:46:0235 s
3/3
231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins the
two data sets is:

hc.sql(query).
mapPartitions(iter => {
  iter.map {
case Row(
 ...
 ...
 ...
)
  }
}
).toDF()
.groupBy("id_1", "id_2", "day_hour", "day_hour_2")
.agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
  sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))


Please advise on how to reduce the shuffle and speed this up.


~Pratik


Re: Stream are not serializable

2015-10-23 Thread pratik khadloya
You might be referring to some class level variables from your code.
I got to see the actual field which caused the error when i marked the
class as serializable and run it on cluster.

class MyClass extends java.io.Serializable

The following resources will also help:
https://youtu.be/mHF3UPqLOL8?t=54m57s
http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou

~Pratik

On Fri, Oct 23, 2015 at 10:30 AM Ted Yu  wrote:

> Mind sharing your code, if possible ?
>
> Thanks
>
> On Fri, Oct 23, 2015 at 9:49 AM, crakjie  wrote:
>
>> Hello.
>>
>> I have activated the file checkpointing for a DStream to unleach the
>> updateStateByKey.
>> My unit test worked with no problem but when I have integrated this in my
>> full stream I got this exception. :
>>
>> java.io.NotSerializableException: DStream checkpointing has been enabled
>> but
>> the DStreams with their functions are not serializable
>> Serialization stack:
>>
>> at
>>
>> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
>> at
>>
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>> at
>>
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>> at com.misterbell.shiva.StreamingApp$.main(StreamingApp.scala:196)
>> at com.misterbell.shiva.StreamingApp.main(StreamingApp.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> But this exception is not very clear about what part of my stream is not
>> serializable.
>>
>> I try to add
>>
>>
>> .set("spark.driver.extraJavaOptions","-Dsun.io.serialization.extendedDebugInfo=true")
>>
>> .set("spark.executor.extraJavaOptions","-Dsun.io.serialization.extendedDebugInfo=true")
>>
>> to my spark conf to have more information, but it changes nothing ( it
>> should )
>>
>> So how can I find which function or part of my stream is not serializable?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Stream-are-not-serializable-tp25185.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>