Re: Spark Thrift Server Concurrency

2016-06-26 Thread Prabhu Joseph
Spark Thrift Server is started with

./sbin/start-thriftserver.sh --master yarn-client --hiveconf
hive.server2.thrift.port=10001 --num-executors 4 --executor-cores 2
--executor-memory 4G --conf spark.scheduler.mode=FAIR

20 parallel below queries are executed

select distinct val2 from philips1 where key>=1000 and key<=1500

And there is no issue at the backend Spark Executors, as spark jobs UI
shows all 20 queries are launched and completed with same duration. And all
20 queries are received by Spark Thrift Server at same time. But the Spark
Driver present inside Spark Thrift Sever  looks like overloaded and hence
the queries are not parsed and
submitted to executors at same time and hence seeing the delay in query
execution time from client.





On Thu, Jun 23, 2016 at 11:12 PM, Michael Segel 
wrote:

> Hi,
> There are  a lot of moving parts and a lot of unknowns from your
> description.
> Besides the version stuff.
>
> How many executors, how many cores? How much memory?
> Are you persisting (memory and disk) or just caching (memory)
>
> During the execution… same tables… are  you seeing a lot of shuffling of
> data for some queries and not others?
>
> It sounds like an interesting problem…
>
> On Jun 23, 2016, at 5:21 AM, Prabhu Joseph 
> wrote:
>
> Hi All,
>
>On submitting 20 parallel same SQL query to Spark Thrift Server, the
> query execution time for some queries are less than a second and some are
> more than 2seconds. The Spark Thrift Server logs shows all 20 queries are
> submitted at same time 16/06/23 12:12:01 but the result schema are at
> different times.
>
> 16/06/23 12:12:01 INFO SparkExecuteStatementOperation: Running query
> 'select distinct val2 from philips1 where key>=1000 and key<=1500
>
> 16/06/23 12:12:*02* INFO SparkExecuteStatementOperation: Result Schema:
> ArrayBuffer(val2#2110)
> 16/06/23 12:12:*03* INFO SparkExecuteStatementOperation: Result Schema:
> ArrayBuffer(val2#2182)
> 16/06/23 12:12:*04* INFO SparkExecuteStatementOperation: Result Schema:
> ArrayBuffer(val2#2344)
> 16/06/23 12:12:*05* INFO SparkExecuteStatementOperation: Result Schema:
> ArrayBuffer(val2#2362)
>
> There are sufficient executors running on YARN. The concurrency is
> affected by Single Driver. How to improve the concurrency and what are the
> best practices.
>
> Thanks,
> Prabhu Joseph
>
>
>


Re: Spark 1.6.1: Unexpected partition behavior?

2016-06-26 Thread Randy Gelhausen
Sorry, please ignore the above.

I now see I called coalesce on a different reference, than I used to
register the table.

On Sun, Jun 26, 2016 at 6:34 PM, Randy Gelhausen  wrote:

> 
> val enriched_web_logs = sqlContext.sql("""
> select web_logs.datetime, web_logs.node as app_host, source_ip, b.node as
> source_host, log
> from web_logs
> left outer join (select distinct node, address from nodes) b on source_ip
> = address
> """)
>
> enriched_web_logs.coalesce(1).write.format("parquet").mode("overwrite").save(bucket+"derived/enriched_web_logs")
> enriched_web_logs.registerTempTable("enriched_web_logs")
> sqlContext.cacheTable("enriched_web_logs")
> 
>
> There are only 524 records in the resulting table, and I have explicitly
> attempted to coalesce into 1 partition.
>
> Yet my Spark UI shows 200 (mostly empty) partitions:
> RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize
> in ExternalBlockStoreSize on Disk
> In-memory table enriched_web_logs
>  Memory Deserialized 1x
> Replicated 200 100% 22.0 KB 0.0 B 0.0 BWhy would there be 200 partitions
> despite the coalesce call?
>


Spark 1.6.1: Unexpected partition behavior?

2016-06-26 Thread Randy Gelhausen

val enriched_web_logs = sqlContext.sql("""
select web_logs.datetime, web_logs.node as app_host, source_ip, b.node as
source_host, log
from web_logs
left outer join (select distinct node, address from nodes) b on source_ip =
address
""")
enriched_web_logs.coalesce(1).write.format("parquet").mode("overwrite").save(bucket+"derived/enriched_web_logs")
enriched_web_logs.registerTempTable("enriched_web_logs")
sqlContext.cacheTable("enriched_web_logs")


There are only 524 records in the resulting table, and I have explicitly
attempted to coalesce into 1 partition.

Yet my Spark UI shows 200 (mostly empty) partitions:
RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize in
ExternalBlockStoreSize on Disk
In-memory table enriched_web_logs
 Memory
Deserialized 1x Replicated 200 100% 22.0 KB 0.0 B 0.0 BWhy would there be
200 partitions despite the coalesce call?


Using SHUFFLE_SERVICE_ENABLED for MesosCoarseGrainedSchedulerBackend, BlockManager, and Utils?

2016-06-26 Thread Jacek Laskowski
Hi,

I've just noticed that there is the private[spark] val
SHUFFLE_SERVICE_ENABLED in package object config [1]

[1] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L74-L75

However MesosCoarseGrainedSchedulerBackend [2], BlockManager [3] and
Utils [4] are all using their own copies.

Would that be acceptable* to send a pull request to get rid of this redundancy?

[*] I'm staring at @srowen for his nodding in agreement :-)

[2] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L71
[3] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L73-L74
[4] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L748

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org