Re: [spark-streaming] New directStream API reads topic's partitions sequentially. Why?

2015-09-05 Thread Понькин Алексей
Hi Cody,
Thank you for quick response.
The problem was that my application did not have enough resources(all executors 
were busy). So spark decided to run these tasks sequentially. When I add more 
executors for application everything goes fine.
Thank you anyway.
P.S. BTW thanks you for great video lecture about directStream 
https://youtu.be/fXnNEq1v3VA.

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1=1


04.09.2015, 17:03, "Cody Koeninger" :
> The direct stream just makes a spark partition per kafka partition, so if 
> those partitions are not getting evenly distributed among executors, 
> something else is probably wrong with your configuration.
>
> If you replace the kafka stream with a dummy rdd created with e.g. 
> sc.parallelize, what happens?
>
> Also, are you running kafka on one of the yarn executors, or on a different 
> machine?
>
> On Fri, Sep 4, 2015 at 5:17 AM, ponkin  wrote:
>> Hi,
>> I am trying to read kafka topic with new directStream method in KafkaUtils.
>> I have Kafka topic with 8 partitions.
>> I am running streaming job on yarn with 8 execuors with 1 core  for each
>> one.
>> So noticed that spark reads all topic's partitions in one executor
>> sequentially - this is obviously not what I want.
>> I want spark to read all partitions in parallel.
>> How can I achieve that?
>>
>> Thank you, in advance.
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-New-directStream-API-reads-topic-s-partitions-sequentially-Why-tp24577.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Effects of Driver Memory, Executor Memory, Driver Memory Overhead and Executor Memory Overhead on success of job runs

2015-09-05 Thread Timothy Sum Hon Mun
Hi Krishna,

Thanks for your reply. I will definitely take a look at it to understand
the configuration details.

Best Regards,
Tim

On Tue, Sep 1, 2015 at 6:17 PM, Krishna Sangeeth KS <
kskrishnasange...@gmail.com> wrote:

> Hi Timothy,
>
> I think the driver memory in all your examples is more than what is
> necessary in usual cases and executor memory is quite less.
>
> I found this devops talk[1] at spark-summit here to be super useful in
> understanding few of this configuration details.
>
> [1] https://.youtube.com/watch?v=l4ZYUfZuRbU
>
> Cheers,
> Sangeeth
> On Aug 30, 2015 7:28 AM, "timothy22000"  wrote:
>
>> I am doing some memory tuning on my Spark job on YARN and I notice
>> different
>> settings would give different results and affect the outcome of the Spark
>> job run. However, I am confused and do not understand completely why it
>> happens and would appreciate if someone can provide me with some guidance
>> and explanation.
>>
>> I will provide some background information and describe the cases that I
>> have experienced and post my questions after them below.
>>
>> *My environment setting were as below:*
>>
>>  - Memory 20G, 20 VCores per node (3 nodes in total)
>>  - Hadoop 2.6.0
>>  - Spark 1.4.0
>>
>> My code recursively filters an RDD to make it smaller (removing examples
>> as
>> part of an algorithm), then does mapToPair and collect to gather the
>> results
>> and save them within a list.
>>
>>  First Case
>>
>> /`/bin/spark-submit --class  --master yarn-cluster
>> --driver-memory 7g --executor-memory 1g --num-executors 3
>> --executor-cores 1
>> --jars `
>> /
>> If I run my program with any driver memory less than 11g, I will get the
>> error below which is the SparkContext being stopped or a similar error
>> which
>> is a method being called on a stopped SparkContext. From what I have
>> gathered, this is related to memory not being enough.
>>
>>
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24507/EKxQD.png
>> >
>>
>> Second Case
>>
>>
>> /`/bin/spark-submit --class  --master yarn-cluster
>> --driver-memory 7g --executor-memory 3g --num-executors 3
>> --executor-cores 1
>> --jars `/
>>
>> If I run the program with the same driver memory but higher executor
>> memory,
>> the job runs longer (about 3-4 minutes) than the first case and then it
>> will
>> encounter a different error from earlier which is a Container
>> requesting/using more memory than allowed and is being killed because of
>> that. Although I find it weird since the executor memory is increased and
>> this error occurs instead of the error in the first case.
>>
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24507/tr24f.png
>> >
>>
>> Third Case
>>
>>
>> /`/bin/spark-submit --class  --master yarn-cluster
>> --driver-memory 11g --executor-memory 1g --num-executors 3
>> --executor-cores
>> 1 --jars `/
>>
>> Any setting with driver memory greater than 10g will lead to the job being
>> able to run successfully.
>>
>> Fourth Case
>>
>>
>> /`/bin/spark-submit --class  --master yarn-cluster
>> --driver-memory 2g --executor-memory 1g --conf
>> spark.yarn.executor.memoryOverhead=1024 --conf
>> spark.yarn.driver.memoryOverhead=1024 --num-executors 3 --executor-cores 1
>> --jars `
>> /
>> The job will run successfully with this setting (driver memory 2g and
>> executor memory 1g but increasing the driver memory overhead(1g) and the
>> executor memory overhead(1g).
>>
>> Questions
>>
>>
>>  1. Why is a different error thrown and the job runs longer (for the
>> second
>> case) between the first and second case with only the executor memory
>> being
>> increased? Are the two errors linked in some way?
>>
>>  2. Both the third and fourth case succeeds and I understand that it is
>> because I am giving more memory which solves the memory problems. However,
>> in the third case,
>>
>> /spark.driver.memory + spark.yarn.driver.memoryOverhead = the memory that
>> YARN will create a JVM
>> = 11g + (driverMemory * 0.07, with minimum of 384m)
>> = 11g + 1.154g
>> = 12.154g/
>>
>> So, from the formula, I can see that my job requires MEMORY_TOTAL of
>> around
>> 12.154g to run successfully which explains why I need more than 10g for
>> the
>> driver memory setting.
>>
>> But for the fourth case,
>>
>> /
>> spark.driver.memory + spark.yarn.driver.memoryOverhead = the memory that
>> YARN will create a JVM
>> = 2 + (driverMemory * 0.07, with minimum of 384m)
>> = 2g + 0.524g
>> = 2.524g
>> /
>>
>> It seems that just by increasing the memory overhead by a small amount of
>> 1024(1g) it leads to the successful run of the job with driver memory of
>> only 2g and the MEMORY_TOTAL is only 2.524g! Whereas without the overhead
>> configuration, driver memory less than 11g fails but it doesn't make sense
>> from the formula which is why I am confused.
>>
>> Why increasing the memory overhead (for both driver and executor) allows
>> my
>> job to complete successfully with a lower MEMORY_TOTAL 

Re: Failing to include multiple JDBC drivers

2015-09-05 Thread Yana Kadiyska
If memory serves me correctly in 1.3.1 at least there was a problem with
when the driver was added -- the right classloader wasn't picking it up.
You can try searching the archives, but the issue is similar to these
threads:
http://stackoverflow.com/questions/30940566/connecting-from-spark-pyspark-to-postgresql
http://stackoverflow.com/questions/30221677/spark-sql-postgresql-jdbc-classpath-issues

I thought this was fixed in 1.4.1...but in any case, maybe try setting
SPARK_CLASSPATH explicitly if 1.4.1 is still a no-go...might be a PITA if
you're doing reads as you'd have to do it on each slave -- I'd try to run
with a single slave until you get this fixed...

On Fri, Sep 4, 2015 at 11:59 PM, Nicholas Connor <
nicholas.k.con...@gmail.com> wrote:

> So, I need to connect to multiple databases to do cool stuff with Spark.
> To do this, I need multiple database drivers: Postgres + MySQL.
>
> *Problem*: Spark fails to run both drivers
>
> This method works for one driver at a time:
>
> spark-submit  --driver-class-path="/driver.jar"
>
> These methods do not work for one driver, or many (though Spark does say
> Added "driver.jar" with timestamp *** in the log):
>
>- spark-submit --jars "driver1.jar, driver2.jar"
>- sparkContext.addJar("driver.jar")
>- echo 'spark.driver.extraClassPath="driver.jar"' >>
>spark-defaults.conf
>- echo 'spark.executor.extraClassPath="driver.jar"' >>
>spark-defaults.conf
>- sbt assembly (fat jar with drivers)
>
> *Example error:*
>
> Exception in thread "main" java.sql.SQLException: No suitable driver found
> for jdbc:mysql:// at
> com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1055) at
> com.mysql.jdbc.SQLError.createSQLException(SQLError.java:956) at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3491) at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3423) at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:910) at
> com.mysql.jdbc.MysqlIO.secureAuth411(MysqlIO.java:3923) at
> com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1273)
>
> *Versions Tested*: Spark 1.3.1 && 1.4.1
>
> What method can I use to load both drivers?
> Thanks,
>
> Nicholas Connor
>


Re: How to avoid shuffle errors for a large join ?

2015-09-05 Thread Reynold Xin
Try increase the shuffle memory fraction (by default it is only 16%).
Again, if you run Spark 1.5, this will probably run a lot faster,
especially if you increase the shuffle memory fraction ...

On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak  wrote:

> While it works with sort-merge-join, it takes about 12h to finish (with
> 1 shuffle partitions). My hunch is that the reason for that is this:
>
> INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to
> disk (62 times so far)
>
> (and lots more where this comes from).
>
> On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  wrote:
>
>> Can you try 1.5? This should work much, much better in 1.5 out of the box.
>>
>> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
>> default. However, the sort-merge join in 1.4 can still trigger a lot of
>> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
>> 1.5 for your case.
>>
>>
>> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak  wrote:
>>
>>> I'm getting errors like "Removing executor with no recent heartbeats" &
>>> "Missing an output location for shuffle" errors for a large SparkSql join
>>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>>> configure the job to avoid them.
>>>
>>> The initial stage completes fine with some 30k tasks on a cluster with
>>> 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>>> the shuffle stage first waits 30min in the scheduling phase according to
>>> the UI, and then dies with the mentioned errors.
>>>
>>> I can see in the GC logs that the executors reach their memory limits
>>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>>
>>> num #instances #bytes  class name
>>> --
>>>1: 24913959511958700560
>>>  scala.collection.immutable.HashMap$HashMap1
>>>2: 251085327 8034730464  scala.Tuple2
>>>3: 243694737 5848673688  java.lang.Float
>>>4: 231198778 5548770672  java.lang.Integer
>>>5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
>>>6:  72191582 2310130624
>>>  scala.collection.immutable.HashMap$HashTrieMap
>>>7:  74114058 1778737392  java.lang.Long
>>>8:   6059103  779203840  [Ljava.lang.Object;
>>>9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
>>>   10: 34749   70122104  [B
>>>
>>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>>
>>> spark.core.connection.ack.wait.timeout 600
>>> spark.executor.heartbeatInterval   60s
>>> spark.executor.memory  32g
>>> spark.mesos.coarse false
>>> spark.network.timeout  600s
>>> spark.shuffle.blockTransferService netty
>>> spark.shuffle.consolidateFiles true
>>> spark.shuffle.file.buffer  1m
>>> spark.shuffle.io.maxRetries6
>>> spark.shuffle.manager  sort
>>>
>>> The join is currently configured with spark.sql.shuffle.partitions=1000
>>> but that doesn't seem to help. Would increasing the partitions help ? Is
>>> there a formula to determine an approximate partitions number value for a
>>> join ?
>>> Any help with this job would be appreciated !
>>>
>>> cheers,
>>> Tom
>>>
>>
>>
>


Problem with repartition/OOM

2015-09-05 Thread Yana Kadiyska
Hi folks, I have a strange issue. Trying to read a 7G file and do failry
simple stuff with it:

I can read the file/do simple operations on it. However, I'd prefer to
increase the number of partitions in preparation for more memory-intensive
operations (I'm happy to wait, I just need the job to complete).
Repartition seems to cause an OOM for me?
Could someone shed light/or speculate/ why this would happen -- I thought
we repartition higher to relieve memory pressure?

Im using Spark1.4.1 CDH4 if that makes a difference

This works

val res2 = sqlContext.parquetFile(lst:_*).where($"customer_id"===lit(254))
res2.count
res1: Long = 77885925

scala> res2.explain
== Physical Plan ==
Filter (customer_id#314 = 254)
 PhysicalRDD [4], MapPartitionsRDD[11] at

scala> res2.rdd.partitions.size
res3: Int = 59

​


This doesnt:

scala> res2.repartition(60).count
[Stage 2:>(1 +
45) / 59]15/09/05 10:17:21 WARN TaskSetManager: Lost task 2.0 in stage
2.0 (TID 62, fqdn): java.lang.OutOfMemoryError: Java heap space
at 
parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:729)
at 
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490)
at 
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
at 
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at 
org.apache.spark.sql.sources.SqlNewHadoopRDD$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:207)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

​


how to design the Spark application so that Shuffle data will be automatically cleaned up after some iterations

2015-09-05 Thread Jun Li
In the Spark core "example" directory (I am using Spark 1.2.0), there is an
example called "SparkPageRank.scala",

val sparkConf = new SparkConf().setAppName("PageRank")
val iters = if (args.length > 0) args(1).toInt else 10
val ctx = new SparkContext(sparkConf)
val lines = ctx.textFile(args(0), 1)
val links = lines.map{ s =>
  val parts = s.split("\\s+")
  (parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)

for (i <- 1 to iters) {
  val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
  }
  ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}

val output = ranks.collect()

ctx.stop()

}

I realize that in this example, the lineage will keep extending after each
iteration. As a result, when I monitored the directory that holds the
shuffle data, the shuffle data storage keeps increasing after each
iteration.

How should I structure the application code, so that the ContextCleaner's
doCleanupShuffle will be activated after certain interval (say, several
iterations), so that I can prevent the ever-increasing of the shuffle data
storage for computation that takes many iterations?

Jun


Re: How to avoid shuffle errors for a large join ?

2015-09-05 Thread Gurvinder Singh
On 09/05/2015 11:22 AM, Reynold Xin wrote:
> Try increase the shuffle memory fraction (by default it is only 16%).
> Again, if you run Spark 1.5, this will probably run a lot faster,
> especially if you increase the shuffle memory fraction ...
Hi Reynold,

Does the 1.5 has better join/cogroup performance for RDD case too or
only for SQL.

- Gurvinder
> 
> On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak  > wrote:
> 
> While it works with sort-merge-join, it takes about 12h to finish
> (with 1 shuffle partitions). My hunch is that the reason for
> that is this:
> 
> INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB
> to disk (62 times so far)
> 
> (and lots more where this comes from).
> 
> On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  > wrote:
> 
> Can you try 1.5? This should work much, much better in 1.5 out
> of the box.
> 
> For 1.4, I think you'd want to turn on sort-merge-join, which is
> off by default. However, the sort-merge join in 1.4 can still
> trigger a lot of garbage, making it slower. SMJ performance is
> probably 5x - 1000x better in 1.5 for your case.
> 
> 
> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak
> > wrote:
> 
> I'm getting errors like "Removing executor with no recent
> heartbeats" & "Missing an output location for shuffle"
> errors for a large SparkSql join (1bn rows/2.5TB joined with
> 1bn rows/30GB) and I'm not sure how to configure the job to
> avoid them.
> 
> The initial stage completes fine with some 30k tasks on a
> cluster with 70 machines/10TB memory, generating about 6.5TB
> of shuffle writes, but then the shuffle stage first waits
> 30min in the scheduling phase according to the UI, and then
> dies with the mentioned errors.
> 
> I can see in the GC logs that the executors reach their
> memory limits (32g per executor, 2 workers per machine) and
> can't allocate any more stuff in the heap. Fwiw, the top 10
> in the memory use histogram are:
> 
> num #instances #bytes  class name
> --
>1: 24913959511958700560
>  scala.collection.immutable.HashMap$HashMap1
>2: 251085327 8034730464 
>  scala.Tuple2
>3: 243694737 5848673688  java.lang.Float
>4: 231198778 5548770672  java.lang.Integer
>5:  72191585 4298521576
>  [Lscala.collection.immutable.HashMap;
>6:  72191582 2310130624
>  scala.collection.immutable.HashMap$HashTrieMap
>7:  74114058 1778737392  java.lang.Long
>8:   6059103  779203840  [Ljava.lang.Object;
>9:   5461096  174755072
>  scala.collection.mutable.ArrayBuffer
>   10: 34749   70122104  [B
> 
> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
> 
> spark.core.connection.ack.wait.timeout 600
> spark.executor.heartbeatInterval   60s
> spark.executor.memory  32g
> spark.mesos.coarse false
> spark.network.timeout  600s
> spark.shuffle.blockTransferService netty
> spark.shuffle.consolidateFiles true
> spark.shuffle.file.buffer  1m
> spark.shuffle.io.maxRetries6
> spark.shuffle.manager  sort
> 
> The join is currently configured with
> spark.sql.shuffle.partitions=1000 but that doesn't seem to
> help. Would increasing the partitions help ? Is there a
> formula to determine an approximate partitions number value
> for a join ?
> Any help with this job would be appreciated !
> 
> cheers,
> Tom
> 
> 
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is HDFS required for Spark streaming?

2015-09-05 Thread N B
Hi TD,

Thanks!

So our application does turn on checkpoints but we do not recover upon
application restart (we just blow the checkpoint directory away first and
re-create the StreamingContext) as we don't have a real need for that type
of recovery. However, because the application does reduceeByKeyAndWindow
operations, checkpointing has to be turned on. Do you think this scenario
will also only work with HDFS or having local directories suffice?

Thanks
Nikunj



On Fri, Sep 4, 2015 at 3:09 PM, Tathagata Das  wrote:

> Shuffle spills will use local disk, HDFS not needed.
> Spark and Spark Streaming checkpoint info WILL NEED HDFS for
> fault-tolerance. So that stuff can be recovered even if the spark cluster
> nodes go down.
>
> TD
>
> On Fri, Sep 4, 2015 at 2:45 PM, N B  wrote:
>
>> Hello,
>>
>> We have a Spark Streaming program that is currently running on a single
>> node in "local[n]" master mode. We currently give it local directories for
>> Spark's own state management etc. The input is streaming from network/flume
>> and output is also to network/kafka etc, so the process as such does not
>> need any distributed file system.
>>
>> Now, we do want to start distributing this procesing across a few
>> machines and make a real cluster out of it. However, I am not sure if HDFS
>> is a hard requirement for that to happen. I am thinking about the Shuffle
>> spills, DStream/RDD persistence and checkpoint info. Do any of these
>> require the state to be shared via HDFS? Are there other alternatives that
>> can be utilized if state sharing is accomplished via the file system only.
>>
>> Thanks
>> Nikunj
>>
>>
>


Re: Problem with repartition/OOM

2015-09-05 Thread Yanbo Liang
The Parquet output writer allocates one block for each table partition it
is processing and writes partitions in parallel. It will run out of memory
if (number of partitions) times (Parquet block size) is greater than the
available memory. You can try to decrease the number of partitions. And
could you share the value of "parquet.block.size" and your available memory?

2015-09-05 18:59 GMT+08:00 Yana Kadiyska :

> Hi folks, I have a strange issue. Trying to read a 7G file and do failry
> simple stuff with it:
>
> I can read the file/do simple operations on it. However, I'd prefer to
> increase the number of partitions in preparation for more memory-intensive
> operations (I'm happy to wait, I just need the job to complete).
> Repartition seems to cause an OOM for me?
> Could someone shed light/or speculate/ why this would happen -- I thought
> we repartition higher to relieve memory pressure?
>
> Im using Spark1.4.1 CDH4 if that makes a difference
>
> This works
>
> val res2 = sqlContext.parquetFile(lst:_*).where($"customer_id"===lit(254))
> res2.count
> res1: Long = 77885925
>
> scala> res2.explain
> == Physical Plan ==
> Filter (customer_id#314 = 254)
>  PhysicalRDD [4], MapPartitionsRDD[11] at
>
> scala> res2.rdd.partitions.size
> res3: Int = 59
>
> ​
>
>
> This doesnt:
>
> scala> res2.repartition(60).count
> [Stage 2:>(1 + 45) / 
> 59]15/09/05 10:17:21 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 62, 
> fqdn): java.lang.OutOfMemoryError: Java heap space
> at 
> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:729)
> at 
> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490)
> at 
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
> at 
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
> at 
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
> at 
> org.apache.spark.sql.sources.SqlNewHadoopRDD$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:207)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> ​
>