SparkPlan/Shuffle stage reuse with Dataset/DataFrame

2016-10-18 Thread Zhan Zhang
Hi Folks,

We have some Dataset/Dataframe use cases that will benefit from reuse the
SparkPlan and shuffle stage. 

For example, the following cases. Because the query optimization and
sparkplan is generated by catalyst when it is executed, as a result, the
underlying RDD lineage is regenerated for dataset1. Thus, the shuffle stage
will be executed multiple times.

val dataset1 = dataset.groupby.agg
df.registerTempTable("tmpTable")
spark.sql("select * from tmpTable where condition").collect
spark.sql("select * from tmpTable where condition1").cllect

On the one side, we get optimized query plan, but on the other side, we
cannot reuse the data generated by shuffle stage.

Currently, to reuse the dataset1, we have to use persist to cache the data.
It is helpful but sometimes is not what we want, as it has some side effect.
For example, we cannot release the executor that has active cache in it even
it is idle and dynamic allocator is enabled.

In other words, we only want to reuse the shuffle data as much as possible
without caching in a long pipeline with multiple shuffle stages.

I am wondering does it make sense to add a new feature to Dataset/Dataframe
to work as barrier and prevent the query optimization happens across the
barrier.

For example, in the above case, we want catalyst take tmpTable as a barrier,
and stop optimization across it, so that we can reuse the underlying rdd
lineage of dataset1.

The prototype code to make it work is quite small, and we tried in house
with a new API as Dataset.cacheShuffle to make this happen.

But I want some feedback from community before opening a JIRA, as in some
sense, it does stop the optimization earlier. Any comments?





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SparkPlan-Shuffle-stage-reuse-with-Dataset-DataFrame-tp19502.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Anyone knows the hive repo for spark-2.0?

2016-07-07 Thread Zhan Zhang
I saw the pom file having hive version as
1.2.1.spark2. But I cannot find the branch in 
https://github.com/pwendell/

Does anyone know where the repo is?

Thanks.

Zhan Zhang




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Anyone-knows-the-hive-repo-for-spark-2-0-tp18234.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: right outer joins on Datasets

2016-05-24 Thread Zhan Zhang
The reason for "-1" is that the default value for Integer is -1 if the value
is null

  def defaultValue(jt: String): String = jt match {
...
case JAVA_INT => "-1"
...   
 }



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/right-outer-joins-on-Datasets-tp17542p17651.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [GRAPHX] Graph Algorithms and Spark

2016-04-21 Thread Zhan Zhang

You can take a look at this blog from data bricks about GraphFrames

https://databricks.com/blog/2016/03/03/introducing-graphframes.html

Thanks.

Zhan Zhang

On Apr 21, 2016, at 12:53 PM, Robin East 
mailto:robin.e...@xense.co.uk>> wrote:

Hi

Aside from LDA, which is implemented in MLLib, GraphX has the following 
built-in algorithms:


  *   PageRank/Personalised PageRank
  *   Connected Components
  *   Strongly Connected Components
  *   Triangle Count
  *   Shortest Paths
  *   Label Propagation

It also implements a version of Pregel framework, a form of bulk-synchronous 
parallel processing that is the foundation of most of the above algorithms. We 
cover other algorithms in our book and if you search on google you will find a 
number of other examples.

---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action





On 21 Apr 2016, at 19:47, tgensol 
mailto:thibaut.gensol...@gmail.com>> wrote:

Hi there,

I am working in a group of the University of Michigan, and we are trying to
make (and find first) some Distributed graph algorithms.

I know spark, and I found GraphX. I read the docs, but I only found Latent
Dirichlet Allocation algorithms working with GraphX, so I was wondering why
?

Basically, the groupe wants to implement Minimal Spanning Tree, kNN,
shortest path at first.

So my askings are :
Is graphX enough stable for developing this kind of algorithms on it ?
Do you know some algorithms like these working on top of GraphX ? And if no,
why do you think, nobody tried to do it ? Is this too hard ? Or just because
nobody needs it ?

Maybe, it is only my knowledge about GraphX which is weak, and it is not
possible to make these algorithms with GraphX.

Thanking you in advance,
Best regards,

Thibaut



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/GRAPHX-Graph-Algorithms-and-Spark-tp17301.html
Sent from the Apache Spark Developers List mailing list archive at 
Nabble.com<http://nabble.com/>.

-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
dev-h...@spark.apache.org<mailto:dev-h...@spark.apache.org>





Re: RFC: Remote "HBaseTest" from examples?

2016-04-21 Thread Zhan Zhang
FYI: There are several pending patches for DataFrame support on top of HBase.

Thanks.

Zhan Zhang

On Apr 20, 2016, at 2:43 AM, Saisai Shao 
mailto:sai.sai.s...@gmail.com>> wrote:

+1, HBaseTest in Spark Example is quite old and obsolete, the HBase connector 
in HBase repo has evolved a lot, it would be better to guide user to refer to 
that not here in Spark example. So good to remove it.

Thanks
Saisai

On Wed, Apr 20, 2016 at 1:41 AM, Josh Rosen 
mailto:joshro...@databricks.com>> wrote:
+1; I think that it's preferable for code examples, especially third-party 
integration examples, to live outside of Spark.

On Tue, Apr 19, 2016 at 10:29 AM Reynold Xin 
mailto:r...@databricks.com>> wrote:
Yea in general I feel examples that bring in a large amount of dependencies 
should be outside Spark.


On Tue, Apr 19, 2016 at 10:15 AM, Marcelo Vanzin 
mailto:van...@cloudera.com>> wrote:
Hey all,

Two reasons why I think we should remove that from the examples:

- HBase now has Spark integration in its own repo, so that really
should be the template for how to use HBase from Spark, making that
example less useful, even misleading.

- It brings up a lot of extra dependencies that make the size of the
Spark distribution grow.

Any reason why we shouldn't drop that example?

--
Marcelo

-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
dev-h...@spark.apache.org<mailto:dev-h...@spark.apache.org>






Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Zhan Zhang
Thanks Reynold.

Not sure why doExecute is not invoked, since CollectLimit does not support 
wholeStage

case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {

I will dig further into this.

Zhan Zhang

On Apr 18, 2016, at 10:36 PM, Reynold Xin 
mailto:r...@databricks.com>> wrote:

Anyway we can verify this easily. I just added a println to each row and 
verified that only limit + 1 row was printed after the join and before the 
limit.

It'd be great if you do some debugging yourself and see if it is going through 
some other code path.


On Mon, Apr 18, 2016 at 10:35 PM, Reynold Xin 
mailto:r...@databricks.com>> wrote:
But doExecute is not called?

On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:
Hi Reynold,

I just check the code for CollectLimit, there is a shuffle happening to collect 
them in one partition.


protected override def doExecute(): RDD[InternalRow] = {
  val shuffled = new ShuffledRowRDD(
ShuffleExchange.prepareShuffleDependency(
  child.execute(), child.output, SinglePartition, serializer))
  shuffled.mapPartitionsInternal(_.take(limit))
}

Thus, there is no way to avoid processing all data before the shuffle. I think 
that is the reason. Do I understand correctly?

Thanks.

Zhan Zhang
On Apr 18, 2016, at 10:08 PM, Reynold Xin 
mailto:r...@databricks.com>> wrote:

Unless I'm really missing something I don't think so. As I said, it goes 
through an iterator and after processing each stream side we do a shouldStop 
check. The generated code looks like

/* 094 */   protected void processNext() throws java.io.IOException {
/* 095 */ /*** PRODUCE: Project [id#79L] */
/* 096 */
/* 097 */ /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner, 
BuildRight, None */
/* 098 */
/* 099 */ /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
/* 100 */
/* 101 */ // initialize Range
/* 102 */ if (!range_initRange) {
/* 103 */   range_initRange = true;
/* 104 */   initRange(partitionIndex);
/* 105 */ }
/* 106 */
/* 107 */ while (!range_overflow && range_number < range_partitionEnd) {
/* 108 */   long range_value = range_number;
/* 109 */   range_number += 1L;
/* 110 */   if (range_number < range_value ^ 1L < 0) {
/* 111 */ range_overflow = true;
/* 112 */   }
/* 113 */
/* 114 */   /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner, 
BuildRight, None */
/* 115 */
/* 116 */   // generate join key for stream side
/* 117 */
/* 118 */   // find matches from HashedRelation
/* 119 */   UnsafeRow bhj_matched = false ? null: 
(UnsafeRow)bhj_relation.getValue(range_value);
/* 120 */   if (bhj_matched == null) continue;
/* 121 */
/* 122 */   bhj_metricValue.add(1);
/* 123 */
/* 124 */   /*** CONSUME: Project [id#79L] */
/* 125 */
/* 126 */   System.out.println("i got one row");
/* 127 */
/* 128 */   /*** CONSUME: WholeStageCodegen */
/* 129 */
/* 130 */   project_rowWriter.write(0, range_value);
/* 131 */   append(project_result);
/* 132 */
/* 133 */   if (shouldStop()) return;
/* 134 */ }
/* 135 */   }
/* 136 */ }


shouldStop is false once we go pass the limit.



On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:
>From the physical plan, the limit is one level up than the WholeStageCodegen, 
>Thus, I don’t think shouldStop would work here. To move it work, the limit has 
>to be part of the wholeStageCodeGen.

Correct me if I am wrong.

Thanks.

Zhan Zhang

On Apr 18, 2016, at 11:09 AM, Reynold Xin 
mailto:r...@databricks.com>> wrote:

I could be wrong but I think we currently do that through whole stage codegen. 
After processing every row on the stream side, the generated code for broadcast 
join checks whether it has hit the limit or not (through this thing called 
shouldStop).

It is not the most optimal solution, because a single stream side row might 
output multiple hits, but it is usually not a problem.


On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray 
mailto:ray.and...@gmail.com>> wrote:
While you can't automatically push the limit *through* the join, we could push 
it *into* the join (stop processing after generating 10 records). I believe 
that is what Rajesh is suggesting.

On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier 
mailto:hvanhov...@questtec.nl>> wrote:
I am not sure if you can push a limit through a join. This becomes problematic 
if not all keys are present on both sides; in such a case a limit can produce 
fewer rows than the set limit.

This might be a rare case in which whole stage codegen is slower, due to the 
fact that we need to buffer the result of such a stage. You could try to 
disable it by setting "spark.sql.codegen.wholeStage" to false.

2016-04-12 14:32 GMT+02:00 Rajesh Balamohan 
mailto:rajesh.balamo...@gmail.com>>:
Hi,

I ran the

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Zhan Zhang
Hi Reynold,

I just check the code for CollectLimit, there is a shuffle happening to collect 
them in one partition.


protected override def doExecute(): RDD[InternalRow] = {
  val shuffled = new ShuffledRowRDD(
ShuffleExchange.prepareShuffleDependency(
  child.execute(), child.output, SinglePartition, serializer))
  shuffled.mapPartitionsInternal(_.take(limit))
}

Thus, there is no way to avoid processing all data before the shuffle. I think 
that is the reason. Do I understand correctly?

Thanks.

Zhan Zhang
On Apr 18, 2016, at 10:08 PM, Reynold Xin 
mailto:r...@databricks.com>> wrote:

Unless I'm really missing something I don't think so. As I said, it goes 
through an iterator and after processing each stream side we do a shouldStop 
check. The generated code looks like

/* 094 */   protected void processNext() throws java.io.IOException {
/* 095 */ /*** PRODUCE: Project [id#79L] */
/* 096 */
/* 097 */ /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner, 
BuildRight, None */
/* 098 */
/* 099 */ /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
/* 100 */
/* 101 */ // initialize Range
/* 102 */ if (!range_initRange) {
/* 103 */   range_initRange = true;
/* 104 */   initRange(partitionIndex);
/* 105 */ }
/* 106 */
/* 107 */ while (!range_overflow && range_number < range_partitionEnd) {
/* 108 */   long range_value = range_number;
/* 109 */   range_number += 1L;
/* 110 */   if (range_number < range_value ^ 1L < 0) {
/* 111 */ range_overflow = true;
/* 112 */   }
/* 113 */
/* 114 */   /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner, 
BuildRight, None */
/* 115 */
/* 116 */   // generate join key for stream side
/* 117 */
/* 118 */   // find matches from HashedRelation
/* 119 */   UnsafeRow bhj_matched = false ? null: 
(UnsafeRow)bhj_relation.getValue(range_value);
/* 120 */   if (bhj_matched == null) continue;
/* 121 */
/* 122 */   bhj_metricValue.add(1);
/* 123 */
/* 124 */   /*** CONSUME: Project [id#79L] */
/* 125 */
/* 126 */   System.out.println("i got one row");
/* 127 */
/* 128 */   /*** CONSUME: WholeStageCodegen */
/* 129 */
/* 130 */   project_rowWriter.write(0, range_value);
/* 131 */   append(project_result);
/* 132 */
/* 133 */   if (shouldStop()) return;
/* 134 */ }
/* 135 */   }
/* 136 */ }


shouldStop is false once we go pass the limit.



On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:
>From the physical plan, the limit is one level up than the WholeStageCodegen, 
>Thus, I don’t think shouldStop would work here. To move it work, the limit has 
>to be part of the wholeStageCodeGen.

Correct me if I am wrong.

Thanks.

Zhan Zhang

On Apr 18, 2016, at 11:09 AM, Reynold Xin 
mailto:r...@databricks.com>> wrote:

I could be wrong but I think we currently do that through whole stage codegen. 
After processing every row on the stream side, the generated code for broadcast 
join checks whether it has hit the limit or not (through this thing called 
shouldStop).

It is not the most optimal solution, because a single stream side row might 
output multiple hits, but it is usually not a problem.


On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray 
mailto:ray.and...@gmail.com>> wrote:
While you can't automatically push the limit *through* the join, we could push 
it *into* the join (stop processing after generating 10 records). I believe 
that is what Rajesh is suggesting.

On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier 
mailto:hvanhov...@questtec.nl>> wrote:
I am not sure if you can push a limit through a join. This becomes problematic 
if not all keys are present on both sides; in such a case a limit can produce 
fewer rows than the set limit.

This might be a rare case in which whole stage codegen is slower, due to the 
fact that we need to buffer the result of such a stage. You could try to 
disable it by setting "spark.sql.codegen.wholeStage" to false.

2016-04-12 14:32 GMT+02:00 Rajesh Balamohan 
mailto:rajesh.balamo...@gmail.com>>:
Hi,

I ran the following query in spark (latest master codebase) and it took a lot 
of time to complete even though it was a broadcast hash join.

It appears that limit computation is done only after computing complete join 
condition.  Shouldn't the limit condition be pushed to BroadcastHashJoin 
(wherein it would have to stop processing after generating 10 rows?).  Please 
let me know if my understanding on this is wrong.


select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit 10;

>>>>
| == Physical Plan ==
CollectLimit 10
+- WholeStageCodegen
   :  +- Project [l_partkey#893]
   : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, 
BuildRight, None
   ::- Project [l_partkey#893]
   ::  +- Filter isnotnull(l_p

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Zhan Zhang
>From the physical plan, the limit is one level up than the WholeStageCodegen, 
>Thus, I don’t think shouldStop would work here. To move it work, the limit has 
>to be part of the wholeStageCodeGen.

Correct me if I am wrong.

Thanks.

Zhan Zhang

On Apr 18, 2016, at 11:09 AM, Reynold Xin 
mailto:r...@databricks.com>> wrote:

I could be wrong but I think we currently do that through whole stage codegen. 
After processing every row on the stream side, the generated code for broadcast 
join checks whether it has hit the limit or not (through this thing called 
shouldStop).

It is not the most optimal solution, because a single stream side row might 
output multiple hits, but it is usually not a problem.


On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray 
mailto:ray.and...@gmail.com>> wrote:
While you can't automatically push the limit *through* the join, we could push 
it *into* the join (stop processing after generating 10 records). I believe 
that is what Rajesh is suggesting.

On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier 
mailto:hvanhov...@questtec.nl>> wrote:
I am not sure if you can push a limit through a join. This becomes problematic 
if not all keys are present on both sides; in such a case a limit can produce 
fewer rows than the set limit.

This might be a rare case in which whole stage codegen is slower, due to the 
fact that we need to buffer the result of such a stage. You could try to 
disable it by setting "spark.sql.codegen.wholeStage" to false.

2016-04-12 14:32 GMT+02:00 Rajesh Balamohan 
mailto:rajesh.balamo...@gmail.com>>:
Hi,

I ran the following query in spark (latest master codebase) and it took a lot 
of time to complete even though it was a broadcast hash join.

It appears that limit computation is done only after computing complete join 
condition.  Shouldn't the limit condition be pushed to BroadcastHashJoin 
(wherein it would have to stop processing after generating 10 rows?).  Please 
let me know if my understanding on this is wrong.


select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit 10;

>>>>
| == Physical Plan ==
CollectLimit 10
+- WholeStageCodegen
   :  +- Project [l_partkey#893]
   : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, 
BuildRight, None
   ::- Project [l_partkey#893]
   ::  +- Filter isnotnull(l_partkey#893)
   :: +- Scan HadoopFiles[l_partkey#893] Format: ORC, 
PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct
   :+- INPUT
   +- BroadcastExchange 
HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as 
bigint)),List(ps_partkey#908))
  +- WholeStageCodegen
 :  +- Project [ps_partkey#908]
 : +- Filter isnotnull(ps_partkey#908)
 :+- Scan HadoopFiles[ps_partkey#908] Format: ORC, 
PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct  |
>>>>




--
~Rajesh.B






Re: more uniform exception handling?

2016-04-18 Thread Zhan Zhang
+1
Both of the would be very helpful in debugging

Thanks.

Zhan Zhang

On Apr 18, 2016, at 1:18 PM, Evan Chan  wrote:

> +1000.
> 
> Especially if the UI can help correlate exceptions, and we can reduce
> some exceptions.
> 
> There are some exceptions which are in practice very common, such as
> the nasty ClassNotFoundException, that most folks end up spending tons
> of time debugging.
> 
> 
> On Mon, Apr 18, 2016 at 12:16 PM, Reynold Xin  wrote:
>> Josh's pull request on rpc exception handling got me to think ...
>> 
>> In my experience, there have been a few things related exceptions that
>> created a lot of trouble for us in production debugging:
>> 
>> 1. Some exception is thrown, but is caught by some try/catch that does not
>> do any logging nor rethrow.
>> 2. Some exception is thrown, but is caught by some try/catch that does not
>> do any logging, but do rethrow. But the original exception is now masked.
>> 2. Multiple exceptions are logged at different places close to each other,
>> but we don't know whether they are caused by the same problem or not.
>> 
>> 
>> To mitigate some of the above, here's an idea ...
>> 
>> (1) Create a common root class for all the exceptions (e.g. call it
>> SparkException) used in Spark. We should make sure every time we catch an
>> exception from a 3rd party library, we rethrow them as SparkException (a lot
>> of places already do that). In SparkException's constructor, log the
>> exception and the stacktrace.
>> 
>> (2) SparkException has a monotonically increasing ID, and this ID appears in
>> the exception error message (say at the end).
>> 
>> 
>> I think (1) will eliminate most of the cases that an exception gets
>> swallowed. The main downside I can think of is we might log an exception
>> multiple times. However, I'd argue exceptions should be rare, and it is not
>> that big of a deal to log them twice or three times. The unique ID (2) can
>> help us correlate exceptions if they appear multiple times.
>> 
>> Thoughts?
>> 
>> 
>> 
>> 
>> 
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: ORC file writing hangs in pyspark

2016-02-23 Thread Zhan Zhang
Hi James,

You can try to write with other format, e.g., parquet to see whether it is a 
orc specific issue or more generic issue.

Thanks.

Zhan Zhang

On Feb 23, 2016, at 6:05 AM, James Barney 
mailto:jamesbarne...@gmail.com>> wrote:

I'm trying to write an ORC file after running the FPGrowth algorithm on a 
dataset of around just 2GB in size. The algorithm performs well and can display 
results if I take(n) the freqItemSets() of the result after converting that to 
a DF.

I'm using Spark 1.5.2 on HDP 2.3.4 and Python 3.4.2 on Yarn.

I get the results from querying a Hive table, also ORC format, running a number 
of maps, joins, and filters on the data.

When the program attempts to write the files:
result.write.orc('/data/staged/raw_result')
  size_1_buckets.write.orc('/data/staged/size_1_results')
  filter_size_2_buckets.write.orc('/data/staged/size_2_results')

The first path, /data/staged/raw_result, is created with a _temporary folder, 
but the data is never written. The job hangs at this point, apparently 
indefinitely.

Additionally, no logs are recorded or available for the jobs on the history 
server.

What could be the problem?



Dr.appointment this afternoon and WFH tomorrow for another Dr. appointment (EOM)

2016-01-07 Thread Zhan Zhang


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [Spark SQL] SQLContext getOrCreate incorrect behaviour

2015-12-21 Thread Zhan Zhang
This looks to me is a very unusual use case. You stop the SparkContext, and 
start another one. I don’t think it is well supported. As the SparkContext is 
stopped, all the resources are supposed to be released. 

Is there any mandatory reason you have stop and restart another SparkContext.

Thanks.

Zhan Zhang

Note that when sc is stopped, all resources are released (for example in yarn 
On Dec 20, 2015, at 2:59 PM, Jerry Lam  wrote:

> Hi Spark developers,
> 
> I found that SQLContext.getOrCreate(sc: SparkContext) does not behave 
> correctly when a different spark context is provided.
> 
> ```
> val sc = new SparkContext
> val sqlContext =SQLContext.getOrCreate(sc)
> sc.stop
> ...
> 
> val sc2 = new SparkContext
> val sqlContext2 = SQLContext.getOrCreate(sc2)
> sc2.stop
> ```
> 
> The sqlContext2 will reference sc instead of sc2 and therefore, the program 
> will not work because sc has been stopped. 
> 
> Best Regards,
> 
> Jerry 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Multi-core support per task in Spark

2015-12-11 Thread Zhan Zhang
I noticed that it is configurable in job level spark.task.cpus.  Anyway to 
support on task level?

Thanks.

Zhan Zhang


On Dec 11, 2015, at 10:46 AM, Zhan Zhang  wrote:

> Hi Folks,
> 
> Is it possible to assign multiple core per task and how? Suppose we have some 
> scenario, in which some tasks are really heavy processing each record and 
> require multi-threading, and we want to avoid similar tasks assigned to the 
> same executors/hosts. 
> 
> If it is not supported, does it make sense to add this feature. It may seems 
> make user worry about more configuration, but by default we can still do 1 
> core per task and only advanced users need to be aware of this feature.
> 
> Thanks.
> 
> Zhan Zhang
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Multi-core support per task in Spark

2015-12-11 Thread Zhan Zhang
Hi Folks,

Is it possible to assign multiple core per task and how? Suppose we have some 
scenario, in which some tasks are really heavy processing each record and 
require multi-threading, and we want to avoid similar tasks assigned to the 
same executors/hosts. 

If it is not supported, does it make sense to add this feature. It may seems 
make user worry about more configuration, but by default we can still do 1 core 
per task and only advanced users need to be aware of this feature.

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Proposal for SQL join optimization

2015-11-12 Thread Zhan Zhang
Hi Xiao,

Performance-wise, without the manual tuning, the query cannot be finished, and 
with the tuning the query can finish in minutes in TPCH 100G data.

I have created https://issues.apache.org/jira/browse/SPARK-11704 and 
https://issues.apache.org/jira/browse/SPARK-11705 for these two issues, and we 
can move the discussion there.

Thanks.

Zhan Zhang

On Nov 11, 2015, at 6:16 PM, Xiao Li 
mailto:gatorsm...@gmail.com>> wrote:

Hi, Zhan,

That sounds really interesting! Please at me when you submit the PR. If 
possible, please also posted the performance difference.

Thanks,

Xiao Li


2015-11-11 14:45 GMT-08:00 Zhan Zhang 
mailto:zzh...@hortonworks.com>>:
Hi Folks,

I did some performance measurement based on TPC-H recently, and want to bring 
up some performance issue I observed. Both are related to cartesian join.

1. CartesianProduct implementation.

Currently CartesianProduct relies on RDD.cartesian, in which the computation is 
realized as follows

  override def compute(split: Partition, context: TaskContext): Iterator[(T, 
U)] = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
 y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }

>From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. 
>Which is really heavy and may never finished if n is large, especially when 
>rdd2 is coming from ShuffleRDD.

We should have some optimization on CartesianProduct by caching rightResults. 
The problem is that we don’t have cleanup hook to unpersist rightResults AFAIK. 
I think we should have some cleanup hook after query execution.
With the hook available, we can easily optimize such Cartesian join. I believe 
such cleanup hook may also benefit other query optimizations.


2. Unnecessary CartesianProduct join.

When we have some queries similar to following (don’t remember the exact form):
select * from a, b, c, d where a.key1 = c.key1 and b.key2 = c.key2 and c.key3 = 
d.key3

There will be a cartesian join between a and b. But if we just simply change 
the table order, for example from a, c, b, d, such cartesian join are 
eliminated.
Without such manual tuning, the query will never finish if a, c are big. But we 
should not relies on such manual optimization.


Please provide your inputs. If they are both valid, I will open liras for each.

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
dev-h...@spark.apache.org<mailto:dev-h...@spark.apache.org>





Proposal for SQL join optimization

2015-11-11 Thread Zhan Zhang
Hi Folks,

I did some performance measurement based on TPC-H recently, and want to bring 
up some performance issue I observed. Both are related to cartesian join.

1. CartesianProduct implementation.

Currently CartesianProduct relies on RDD.cartesian, in which the computation is 
realized as follows

  override def compute(split: Partition, context: TaskContext): Iterator[(T, 
U)] = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
 y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }

>From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. 
>Which is really heavy and may never finished if n is large, especially when 
>rdd2 is coming from ShuffleRDD.

We should have some optimization on CartesianProduct by caching rightResults. 
The problem is that we don’t have cleanup hook to unpersist rightResults AFAIK. 
I think we should have some cleanup hook after query execution.
With the hook available, we can easily optimize such Cartesian join. I believe 
such cleanup hook may also benefit other query optimizations.


2. Unnecessary CartesianProduct join.

When we have some queries similar to following (don’t remember the exact form):
select * from a, b, c, d where a.key1 = c.key1 and b.key2 = c.key2 and c.key3 = 
d.key3

There will be a cartesian join between a and b. But if we just simply change 
the table order, for example from a, c, b, d, such cartesian join are 
eliminated.
Without such manual tuning, the query will never finish if a, c are big. But we 
should not relies on such manual optimization.


Please provide your inputs. If they are both valid, I will open liras for each.

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Support for views/ virtual tables in SparkSQL

2015-11-09 Thread Zhan Zhang
I think you can rewrite those TPC-H queries not using view, for example 
registerTempTable

Thanks.

Zhan Zhang

On Nov 9, 2015, at 9:34 PM, Sudhir Menon  wrote:

> Team:
> 
> Do we plan to add support for views/ virtual tables in SparkSQL anytime soon?
> Trying to run the TPC-H workload and failing on queries that assumes support 
> for views in the underlying database
> 
> Thanks in advance
> 
> Suds


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: spark-shell 1.5 doesn't seem to work in local mode

2015-09-19 Thread Zhan Zhang
It does not matter whether you start your spark with local or other mode. If 
you have hdfs-site.xml somewhere and spark configuration pointing to that 
config, you will read/write to HDFS.

Thanks.

Zhan Zhang


From: Madhu 
Sent: Saturday, September 19, 2015 12:14 PM
To: dev@spark.apache.org
Subject: Re: spark-shell 1.5 doesn't seem to work in local mode

Thanks guys.

I do have HADOOP_INSTALL set, but Spark 1.4.1 did not seem to mind.
Seems like there's a difference in behavior between 1.5.0 and 1.4.1 for some
reason.

To the best of my knowledge, I just downloaded each tgz and untarred them in
/opt
I adjusted my PATH to point to one or the other, but that should be about
it.

Does 1.5.0 pick up HADOOP_INSTALL?
Wouldn't spark-shell --master local override that?
1.5 seemed to completely ignore --master local



-
--
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/spark-shell-1-5-doesn-t-seem-to-work-in-local-mode-tp14212p14217.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Make off-heap store pluggable

2015-07-21 Thread Zhan Zhang
Hi Alexey,

SPARK-6479<https://issues.apache.org/jira/browse/SPARK-6479> is for the plugin 
API, and SPARK-6112<https://issues.apache.org/jira/browse/SPARK-6112> is for 
hdfs plugin.


Thanks.

Zhan Zhang

On Jul 21, 2015, at 10:56 AM, Alexey Goncharuk 
mailto:alexey.goncha...@gmail.com>> wrote:


2015-07-20 23:29 GMT-07:00 Matei Zaharia 
mailto:matei.zaha...@gmail.com>>:
I agree with this -- basically, to build on Reynold's point, you should be able 
to get almost the same performance by implementing either the Hadoop FileSystem 
API or the Spark Data Source API over Ignite in the right way. This would let 
people save data persistently in Ignite in addition to using it for caching, 
and it would provide a global namespace, optionally a schema, etc. You can 
still provide data locality, short-circuit reads, etc with these APIs.

Absolutely agree.

In fact, Ignite already provides a shared RDD implementation which is 
essentially a view of Ignite cache data. This implementation adheres to the 
Spark DataFrame API. More information can be found here: 
http://ignite.incubator.apache.org/features/igniterdd.html

Also, Ignite in-memory filesystem is compliant with Hadoop filesystem API and 
can transparently replace HDFS if needed. Plugging it into Spark should be 
fairly easy. More information can be found here: 
http://ignite.incubator.apache.org/features/igfs.html

--Alexey




Re: Support for Hive 0.14 in secure mode on hadoop 2.6.0

2015-03-27 Thread Zhan Zhang
Hi Doug,

Spark-5111 is to make spark work with security hadoop cluster in 2.6. There is 
some compatibility issue which need the fix Spark-5111 patch.
In insecure cluster, current spark can connect to hive-0.14 without problems.

By the way, I am really glad to hear that "an adaption layer in Spark SQL in 
1.4” and "allows Spark SQL to connect to arbitrary Hive version”

Thanks.

Zhan Zhang

On Mar 27, 2015, at 12:57 PM, Doug Balog  wrote:

> Is there a JIRA for this adaption layer ? It sounds like a better long term 
> solution.
> 
> If anybody knows what is require to get the current Shim layer working with 
> Hive 0.14, please post what you know.
> I’m willing to spend some time on it, but I’m still learning how things fit 
> together and it might take me a while.
> I’ve been looking at the pr associated with SPARK-5111 for hints.
> 
> Thanks,
> 
> Doug
> 
> 
>> On Mar 27, 2015, at 6:13 AM, Cheng Lian  wrote:
>> 
>> We're planning to replace the current Hive version profiles and shim layer 
>> with an adaption layer in Spark SQL in 1.4. This adaption layer allows Spark 
>> SQL to connect to arbitrary Hive version greater than or equal to 0.12.0 (or 
>> maybe 0.13.1, not decided yet).
>> 
>> However, it's not a promise yet, since this requires major refactoring of 
>> the current Spark SQL Hive support.
>> 
>> Cheng
>> 
>> On 3/27/15 4:48 PM, Doug Balog wrote:
>>> Hi,
>>>  I'm just wondering if anybody is working on supporting Hive 0.14 in secure 
>>> mode on hadoop 2.6.0 ?
>>> I see once Jira referring to it  
>>> https://issues.apache.org/jira/browse/SPARK-5111
>>> but it mentions no effort to move to 0.14.
>>> 
>>> Thanks,
>>> 
>>> Doug
>>> 
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>> 
>>> 
>> 
> 
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks all for the quick response.

Thanks.

Zhan Zhang

On Mar 26, 2015, at 3:14 PM, Patrick Wendell  wrote:

> I think we have a version of mapPartitions that allows you to tell
> Spark the partitioning is preserved:
> 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639
> 
> We could also add a map function that does same. Or you can just write
> your map using an iterator.
> 
> - Patrick
> 
> On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney  wrote:
>> This is just a deficiency of the api, imo. I agree: mapValues could
>> definitely be a function (K, V)=>V1. The option isn't set by the function,
>> it's on the RDD. So you could look at the code and do this.
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>> 
>> def mapValues[U](f: V => U): RDD[(K, U)] = {
>>val cleanF = self.context.clean(f)
>>new MapPartitionsRDD[(K, U), (K, V)](self,
>>  (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
>>  preservesPartitioning = true)
>>  }
>> 
>> What you want:
>> 
>> def mapValues[U](f: (K, V) => U): RDD[(K, U)] = {
>>val cleanF = self.context.clean(f)
>>new MapPartitionsRDD[(K, U), (K, V)](self,
>>  (context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) },
>>      preservesPartitioning = true)
>>  }
>> 
>> One of the nice things about spark is that making such new operators is very
>> easy :)
>> 
>> 2015-03-26 17:54 GMT-04:00 Zhan Zhang :
>> 
>>> Thanks Jonathan. You are right regarding rewrite the example.
>>> 
>>> I mean providing such option to developer so that it is controllable. The
>>> example may seems silly, and I don't know the use cases.
>>> 
>>> But for example, if I also want to operate both the key and value part to
>>> generate some new value with keeping key part untouched. Then mapValues may
>>> not be able to  do this.
>>> 
>>> Changing the code to allow this is trivial, but I don't know whether there
>>> is some special reason behind this.
>>> 
>>> Thanks.
>>> 
>>> Zhan Zhang
>>> 
>>> 
>>> 
>>> 
>>> On Mar 26, 2015, at 2:49 PM, Jonathan Coveney  wrote:
>>> 
>>> I believe if you do the following:
>>> 
>>> 
>>> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
>>> 
>>> (8) MapPartitionsRDD[34] at reduceByKey at :23 []
>>> |  MapPartitionsRDD[33] at mapValues at :23 []
>>> |  ShuffledRDD[32] at reduceByKey at :23 []
>>> +-(8) MapPartitionsRDD[31] at map at :23 []
>>>|  ParallelCollectionRDD[30] at parallelize at :23 []
>>> 
>>> The difference is that spark has no way to know that your map closure
>>> doesn't change the key. if you only use mapValues, it does. Pretty cool that
>>> they optimized that :)
>>> 
>>> 2015-03-26 17:44 GMT-04:00 Zhan Zhang :
>>>> 
>>>> Hi Folks,
>>>> 
>>>> Does anybody know what is the reason not allowing preserverPartitioning
>>>> in RDD.map? Do I miss something here?
>>>> 
>>>> Following example involves two shuffles. I think if preservePartitioning
>>>> is allowed, we can avoid the second one, right?
>>>> 
>>>> val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>>>> val r2 = r1.map((_, 1))
>>>> val r3 = r2.reduceByKey(_+_)
>>>> val r4 = r3.map(x=>(x._1, x._2 + 1))
>>>> val r5 = r4.reduceByKey(_+_)
>>>> r5.collect.foreach(println)
>>>> 
>>>> scala> r5.toDebugString
>>>> res2: String =
>>>> (8) ShuffledRDD[4] at reduceByKey at :29 []
>>>> +-(8) MapPartitionsRDD[3] at map at :27 []
>>>>|  ShuffledRDD[2] at reduceByKey at :25 []
>>>>+-(8) MapPartitionsRDD[1] at map at :23 []
>>>>   |  ParallelCollectionRDD[0] at parallelize at :21 []
>>>> 
>>>> Thanks.
>>>> 
>>>> Zhan Zhang
>>>> 
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>> 
>>> 
>>> 
>> 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks Jonathan. You are right regarding rewrite the example.

I mean providing such option to developer so that it is controllable. The 
example may seems silly, and I don’t know the use cases.

But for example, if I also want to operate both the key and value part to 
generate some new value with keeping key part untouched. Then mapValues may not 
be able to  do this.

Changing the code to allow this is trivial, but I don’t know whether there is 
some special reason behind this.

Thanks.

Zhan Zhang



On Mar 26, 2015, at 2:49 PM, Jonathan Coveney 
mailto:jcove...@gmail.com>> wrote:

I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at :23 []
 |  MapPartitionsRDD[33] at mapValues at :23 []
 |  ShuffledRDD[32] at reduceByKey at :23 []
 +-(8) MapPartitionsRDD[31] at map at :23 []
|  ParallelCollectionRDD[30] at parallelize at :23 []

The difference is that spark has no way to know that your map closure doesn't 
change the key. if you only use mapValues, it does. Pretty cool that they 
optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang 
mailto:zzh...@hortonworks.com>>:
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in 
RDD.map? Do I miss something here?

Following example involves two shuffles. I think if preservePartitioning is 
allowed, we can avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=>(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala> r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at :29 []
 +-(8) MapPartitionsRDD[3] at map at :27 []
|  ShuffledRDD[2] at reduceByKey at :25 []
+-(8) MapPartitionsRDD[1] at map at :23 []
   |  ParallelCollectionRDD[0] at parallelize at :21 []

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>





RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in 
RDD.map? Do I miss something here?

Following example involves two shuffles. I think if preservePartitioning is 
allowed, we can avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=>(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala> r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at :29 []
 +-(8) MapPartitionsRDD[3] at map at :27 []
|  ShuffledRDD[2] at reduceByKey at :25 []
+-(8) MapPartitionsRDD[1] at map at :23 []
   |  ParallelCollectionRDD[0] at parallelize at :21 []

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Spark-thriftserver Issue

2015-03-24 Thread Zhan Zhang
You can try to set it in spark-env.sh.

# - SPARK_LOG_DIR   Where log files are stored.  (Default: 
${SPARK_HOME}/logs)
# - SPARK_PID_DIR   Where the pid file is stored. (Default: /tmp)

Thanks.

Zhan Zhang

On Mar 24, 2015, at 12:10 PM, Anubhav Agarwal 
mailto:anubha...@gmail.com>> wrote:

Zhan specifying port fixed the port issue.

Is it possible to specify the log directory while starting the spark 
thriftserver?
Still getting this error even through the folder exists and everyone has 
permission to use that directory.
drwxr-xr-x  2 root root  4096 Mar 24 19:04 spark-events


Exception in thread "main" java.lang.IllegalArgumentException: Log directory 
/tmp/spark-events does not exist.
at 
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99)
at org.apache.spark.SparkContext.(SparkContext.scala:399)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:49)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:58)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



On Mon, Mar 23, 2015 at 6:51 PM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:
Probably the port is already used by others, e.g., hive. You can change the 
port similar to below


 ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf 
hive.server2.thrift.port=10001

Thanks.

Zhan Zhang

On Mar 23, 2015, at 12:01 PM, Neil Dev 
mailto:neilk...@gmail.com>> wrote:

Hi,

I am having issue starting spark-thriftserver. I'm running spark 1.3.with
Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
hive-thriftserver as well as spark-thriftserver running at the same time.

Starting sparkthrift server:-
sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
--executor-memory 2G

Error:-
I created the folder manually but still getting the following error
Exception in thread "main" java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.


I am getting the following error
15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address0.0.0.0/0.0.0.0:1<http://0.0.0.0:1/>.
   at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93)
   at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79)
   at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
   at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
   at java.lang.Thread.run(Thread.java:745)

Thanks
Neil





Re: Review request for SPARK-6112:Provide OffHeap support through HDFS RAM_DISK

2015-03-23 Thread Zhan Zhang
Thanks Reynold,

Agree with you to open another JIRA to unify the block storage API.  I have 
upload the design doc to SPARK-6479 as well.

Thanks.

Zhan Zhang

On Mar 23, 2015, at 4:03 PM, Reynold Xin 
mailto:r...@databricks.com>> wrote:

I created a ticket to separate the API refactoring from the implementation. 
Would be great to have these as two separate patches to make it easier to 
review (similar to the way we are doing RPC refactoring -- first introducing an 
internal RPC api, port akka to it, and then add an alternative implementation).

https://issues.apache.org/jira/browse/SPARK-6479

Can you upload your design doc there so we can discuss the block store api? 
Thanks.


On Mon, Mar 23, 2015 at 3:47 PM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:
Hi Folks,

I am planning to implement hdfs off heap support for spark, and have uploaded 
the design doc for the off heap support through hdfs ramdisk in jira 
SPARK-6112. Please review it and provide your feedback if anybody are 
interested.

https://issues.apache.org/jira/browse/SPARK-6112

Thanks.

Zhan Zhang




Re: Spark-thriftserver Issue

2015-03-23 Thread Zhan Zhang
Probably the port is already used by others, e.g., hive. You can change the 
port similar to below


 ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf 
hive.server2.thrift.port=10001

Thanks.

Zhan Zhang

On Mar 23, 2015, at 12:01 PM, Neil Dev 
mailto:neilk...@gmail.com>> wrote:

Hi,

I am having issue starting spark-thriftserver. I'm running spark 1.3.with
Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
hive-thriftserver as well as spark-thriftserver running at the same time.

Starting sparkthrift server:-
sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
--executor-memory 2G

Error:-
I created the folder manually but still getting the following error
Exception in thread "main" java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.


I am getting the following error
15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address0.0.0.0/0.0.0.0:1.
   at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93)
   at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79)
   at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
   at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
   at java.lang.Thread.run(Thread.java:745)

Thanks
Neil



Review request for SPARK-6112:Provide OffHeap support through HDFS RAM_DISK

2015-03-23 Thread Zhan Zhang
Hi Folks,

I am planning to implement hdfs off heap support for spark, and have uploaded 
the design doc for the off heap support through hdfs ramdisk in jira 
SPARK-6112. Please review it and provide your feedback if anybody are 
interested.

https://issues.apache.org/jira/browse/SPARK-6112

Thanks.

Zhan Zhang


Re: Welcoming three new committers

2015-02-03 Thread Zhan Zhang
Congratulations!

On Feb 3, 2015, at 2:34 PM, Matei Zaharia  wrote:

> Hi all,
> 
> The PMC recently voted to add three new committers: Cheng Lian, Joseph 
> Bradley and Sean Owen. All three have been major contributors to Spark in the 
> past year: Cheng on Spark SQL, Joseph on MLlib, and Sean on ML and many 
> pieces throughout Spark Core. Join me in welcoming them as committers!
> 
> Matei
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Setting JVM options to Spark executors in Standalone mode

2015-01-16 Thread Zhan Zhang
You can try to add it in in conf/spark-defaults.conf

 # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three”

Thanks.

Zhan Zhang

On Jan 16, 2015, at 9:56 AM, Michel Dufresne  
wrote:

> Hi All,
> 
> I'm trying to set some JVM options to the executor processes in a
> standalone cluster. Here's what I have in *spark-env.sh*:
> 
> jmx_opt="-Dcom.sun.management.jmxremote"
>> jmx_opt="${jmx_opt} -Djava.net.preferIPv4Stack=true"
>> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.port="
>> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.rmi.port=9998"
>> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.ssl=false"
>> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.authenticate=false"
>> jmx_opt="${jmx_opt} -Djava.rmi.server.hostname=${SPARK_PUBLIC_DNS}"
>> export SPARK_WORKER_OPTS="${jmx_opt}"
> 
> 
> However the option are showing up on the *daemon* JVM not the *workers*. It
> has the same effect as if I was using SPARK_DAEMON_JAVA_OPTS (which should
> set it on the daemon process).
> 
> Thanks in advance for your help,
> 
> Michel


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: How spark and hive integrate in long term?

2014-11-22 Thread Zhan Zhang
Thanks Cheng for the insights. 

Regarding the HCatalog, I did some initial investigation too and agree with 
you. As of now, it seems not a good solution. I will try to talk to Hive people 
to see whether there is such guarantee for downward compatibility for thrift 
protocol. By the way, I tried some basic functions using hive-0.13 connect to 
hive-0.14 metastore, and it looks like they are compatible. 

Thanks.

Zhan Zhang


On Nov 22, 2014, at 7:14 AM, Cheng Lian  wrote:

> Should emphasize that this is still a quick and rough conclusion, will 
> investigate this in more detail after 1.2.0 release. Anyway we really like to 
> provide Hive support in Spark SQL as smooth and clean as possible for both 
> developers and end users.
> 
> On 11/22/14 11:05 PM, Cheng Lian wrote:
>> 
>> Hey Zhan,
>> 
>> This is a great question. We are also seeking for a stable API/protocol that 
>> works with multiple Hive versions (esp. 0.12+). SPARK-4114 
>> <https://issues.apache.org/jira/browse/SPARK-4114> was opened for this. Did 
>> some research into HCatalog recently, but I must confess that I’m not an 
>> expert on HCatalog, actually spent only 1 day on exploring it. So please 
>> don’t hesitate to correct me if I was wrong about the conclusions I made 
>> below.
>> 
>> First, although HCatalog API is more pleasant to work with, it’s 
>> unfortunately feature incomplete. It only provides a subset of most commonly 
>> used operations. For example, |HCatCreateTableDesc| maps only a subset of 
>> |CreateTableDesc|, properties like |storeAsSubDirectories|, |skewedColNames| 
>> and |skewedColValues| are missing. It’s also impossible to alter table 
>> properties via HCatalog API (Spark SQL uses this to implement the |ANALYZE| 
>> command). The |hcat| CLI tool provides all those features missing in 
>> HCatalog API via raw Metastore API, and is structurally similar to the old 
>> Hive CLI.
>> 
>> Second, HCatalog API itself doesn’t ensure compatibility, it’s the Thrift 
>> protocol that matters. HCatalog is directly built upon raw Metastore API, 
>> and talks the same Metastore Thrift protocol. The problem we encountered in 
>> Spark SQL is that, usually we deploy Spark SQL Hive support with embedded 
>> mode (for testing) or local mode Metastore, and this makes us suffer from 
>> things like Metastore database schema changes. If Hive Metastore Thrift 
>> protocol is guaranteed to be downward compatible, then hopefully we can 
>> resort to remote mode Metastore and always depend on most recent Hive APIs. 
>> I had a glance of Thrift protocol version handling code in Hive, it seems 
>> that downward compatibility is not an issue. However I didn’t find any 
>> official documents about Thrift protocol compatibility.
>> 
>> That said, in the future, hopefully we can only depend on most recent Hive 
>> dependencies and remove the Hive shim layer introduced in branch 1.2. For 
>> users who use exactly the same version of Hive as Spark SQL, they can use 
>> either remote or local/embedded Metastore; while for users who want to 
>> interact with existing legacy Hive clusters, they have to setup a remote 
>> Metastore and let the Thrift protocol to handle compatibility.
>> 
>> — Cheng
>> 
>> On 11/22/14 6:51 AM, Zhan Zhang wrote:
>> 
>>> Now Spark and hive integration is a very nice feature. But I am wondering
>>> what the long term roadmap is for spark integration with hive. Both of these
>>> two projects are undergoing fast improvement and changes. Currently, my
>>> understanding is that spark hive sql part relies on hive meta store and
>>> basic parser to operate, and the thrift-server intercept hive query and
>>> replace it with its own engine.
>>> 
>>> With every release of hive, there need a significant effort on spark part to
>>> support it.
>>> 
>>> For the metastore part, we may possibly replace it with hcatalog. But given
>>> the dependency of other parts on hive, e.g., metastore, thriftserver,
>>> hcatlog may not be able to help much.
>>> 
>>> Does anyone have any insight or idea in mind?
>>> 
>>> Thanks.
>>> 
>>> Zhan Zhang
>>> 
>>> 
>>> 
>>> --
>>> View this message in 
>>> context:http://apache-spark-developers-list.1001551.n3.nabble.com/How-spark-and-hive-integrate-in-long-term-tp9482.html
>>> Sent from the Apache Spark Developers List mailing list archive at 
>>> Nabble.com.
>>> 
>>> -
>>> 

Re: How spark and hive integrate in long term?

2014-11-21 Thread Zhan Zhang
Thanks Dean, for the information.

Hive-on-spark is nice. Spark sql has the advantage to take the full advantage 
of spark and allows user to manipulate the table as RDD through native spark 
support.

When I tried to upgrade the current hive-0.13.1 support to hive-0.14.0. I found 
the hive parser is not compatible any more. In the meantime, those new feature 
introduced in hive-0.14.1, e.g, ACID, etc, is not there yet. In the meantime, 
spark-0.12 also
has some nice feature added which is supported by thrift-server too, e.g., 
hive-0.13, table cache, etc. 

Given that both have more and more features added, it would be great if user 
can take advantage of both. Current, spark sql give us such benefits partially, 
but I am wondering how to keep such integration in long term.

Thanks.

Zhan Zhang

On Nov 21, 2014, at 3:12 PM, Dean Wampler  wrote:

> I can't comment on plans for Spark SQL's support for Hive, but several
> companies are porting Hive itself onto Spark:
> 
> http://blog.cloudera.com/blog/2014/11/apache-hive-on-apache-spark-the-first-demo/
> 
> I'm not sure if they are leveraging the old Shark code base or not, but it
> appears to be a fresh effort.
> 
> dean
> 
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
> 
> On Fri, Nov 21, 2014 at 2:51 PM, Zhan Zhang  wrote:
> 
>> Now Spark and hive integration is a very nice feature. But I am wondering
>> what the long term roadmap is for spark integration with hive. Both of
>> these
>> two projects are undergoing fast improvement and changes. Currently, my
>> understanding is that spark hive sql part relies on hive meta store and
>> basic parser to operate, and the thrift-server intercept hive query and
>> replace it with its own engine.
>> 
>> With every release of hive, there need a significant effort on spark part
>> to
>> support it.
>> 
>> For the metastore part, we may possibly replace it with hcatalog. But given
>> the dependency of other parts on hive, e.g., metastore, thriftserver,
>> hcatlog may not be able to help much.
>> 
>> Does anyone have any insight or idea in mind?
>> 
>> Thanks.
>> 
>> Zhan Zhang
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/How-spark-and-hive-integrate-in-long-term-tp9482.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>> 
>> 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



How spark and hive integrate in long term?

2014-11-21 Thread Zhan Zhang
Now Spark and hive integration is a very nice feature. But I am wondering
what the long term roadmap is for spark integration with hive. Both of these
two projects are undergoing fast improvement and changes. Currently, my
understanding is that spark hive sql part relies on hive meta store and
basic parser to operate, and the thrift-server intercept hive query and
replace it with its own engine.

With every release of hive, there need a significant effort on spark part to
support it. 

For the metastore part, we may possibly replace it with hcatalog. But given
the dependency of other parts on hive, e.g., metastore, thriftserver,
hcatlog may not be able to help much. 

Does anyone have any insight or idea in mind?

Thanks.

Zhan Zhang 



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/How-spark-and-hive-integrate-in-long-term-tp9482.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: HiveShim not found when building in Intellij

2014-10-28 Thread Zhan Zhang
-Phive is to enable hive-0.13.1 and "-Phive -Phive-0.12.0” is to enable 
hive-0.12.0. Note that the thrift-server is not supported yet in hive-0.13, but 
expected to go to upstream soon (Spark-3720).

Thanks.

Zhan Zhang


 
On Oct 28, 2014, at 9:09 PM, Stephen Boesch  wrote:

> Thanks Patrick for the heads up.
> 
> I have not been successful to discover a combination of profiles (i.e.
> enabling hive or hive-0.12.0 or hive-13.0) that works in Intellij with
> maven. Anyone who knows how to handle this - a quick note here would be
> appreciated.
> 
> 
> 
> 2014-10-28 20:20 GMT-07:00 Patrick Wendell :
> 
>> Hey Stephen,
>> 
>> In some cases in the maven build we now have pluggable source
>> directories based on profiles using the maven build helper plug-in.
>> This is necessary to support cross building against different Hive
>> versions, and there will be additional instances of this due to
>> supporting scala 2.11 and 2.10.
>> 
>> In these cases, you may need to add source locations explicitly to
>> intellij if you want the entire project to compile there.
>> 
>> Unfortunately as long as we support cross-building like this, it will
>> be an issue. Intellij's maven support does not correctly detect our
>> use of the maven-build-plugin to add source directories.
>> 
>> We should come up with a good set of instructions on how to import the
>> pom files + add the few extra source directories. Off hand I am not
>> sure exactly what the correct sequence is.
>> 
>> - Patrick
>> 
>> On Tue, Oct 28, 2014 at 7:57 PM, Stephen Boesch  wrote:
>>> Hi Matei,
>>>  Until my latest pull from upstream/master it had not been necessary to
>>> add the hive profile: is it now??
>>> 
>>> I am not using sbt gen-idea. The way to open in intellij has been to Open
>>> the parent directory. IJ recognizes it as a maven project.
>>> 
>>> There are several steps to do surgery on the yarn-parent / yarn projects
>> ,
>>> then do a full rebuild.  That was working until one week ago.
>>> Intellij/maven is presently broken in  two ways:  this hive shim (which
>> may
>>> yet hopefully be a small/simple fix - let us see) and  (2) the
>>> "NoClassDefFoundError
>>> on ThreadFactoryBuilder" from my prior emails -and which is quite a
>> serious
>>> problem .
>>> 
>>> 2014-10-28 19:46 GMT-07:00 Matei Zaharia :
>>> 
>>>> Hi Stephen,
>>>> 
>>>> How did you generate your Maven workspace? You need to make sure the
>> Hive
>>>> profile is enabled for it. For example sbt/sbt -Phive gen-idea.
>>>> 
>>>> Matei
>>>> 
>>>>> On Oct 28, 2014, at 7:42 PM, Stephen Boesch 
>> wrote:
>>>>> 
>>>>> I have run on the command line via maven and it is fine:
>>>>> 
>>>>> mvn   -Dscalastyle.failOnViolation=false -DskipTests -Pyarn
>> -Phadoop-2.3
>>>>> compile package install
>>>>> 
>>>>> 
>>>>> But with the latest code Intellij builds do not work. Following is
>> one of
>>>>> 26 similar errors:
>>>>> 
>>>>> 
>>>>> Error:(173, 38) not found: value HiveShim
>>>>> 
>>>> Option(tableParameters.get(HiveShim.getStatsSetupConstTotalSize))
>>>>>^
>>>> 
>>>> 
>> 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



RE: Working Formula for Hive 0.13?

2014-08-28 Thread Zhan Zhang
I have preliminary patch against spark1.0.2, which is attached to spark-2706.
Now I am working on supporting both hive-0.12 and hive-0.13.1 with
non-intrusive way (not breaking any existing hive-0.12 when introduce
supporting new version). I will attach a proposal to solve multi-version
support issue to spark-2706 soon.

Thanks.

Zhan Zhang



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p8118.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: spark.akka.frameSize stalls job in 1.1.0

2014-08-18 Thread Zhan Zhang
Not sure exactly how you use it. My understanding is that in spark it would be 
better to keep the overhead of driver as less as possible. Is it possible to 
broadcast trie to executors, do computation there and then aggregate the 
counters (??) in reduct phase?

Thanks.

Zhan Zhang

On Aug 18, 2014, at 8:54 AM, Jerry Ye  wrote:

> Hi Zhan,
> Thanks for looking into this. I'm actually using the hash map as an example 
> of the simplest snippet of code that is failing for me. I know that this is 
> just the word count. In my actual problem I'm using a Trie data structure to 
> find substring matches.
> 
> 
> On Sun, Aug 17, 2014 at 11:35 PM, Zhan Zhang  wrote:
> Is it because countByValue or toArray put too much stress on the driver, if 
> there are many unique words
> To me it is a typical word count problem, then you can solve it as follows 
> (correct me if I am wrong)
> 
> val textFile = sc.textFile(“file")
> val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 
> 1)).reduceByKey((a, b) => a + b)
> counts.saveAsTextFile(“file”)//any way you don’t want to collect results to 
> master, and instead putting them in file.
> 
> Thanks.
> 
> Zhan Zhang
> 
> On Aug 16, 2014, at 9:18 AM, Jerry Ye  wrote:
> 
> > The job ended up running overnight with no progress. :-(
> >
> >
> > On Sat, Aug 16, 2014 at 12:16 AM, Jerry Ye  wrote:
> >
> >> Hi Xiangrui,
> >> I actually tried branch-1.1 and master and it resulted in the job being
> >> stuck at the TaskSetManager:
> >> 14/08/16 06:55:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
> >> with 2 tasks
> >> 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:0 as
> >> TID 2 on executor 8: ip-10-226-199-225.us-west-2.compute.internal
> >> (PROCESS_LOCAL)
> >> 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as
> >> 28055875 bytes in 162 ms
> >> 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:1 as
> >> TID 3 on executor 0: ip-10-249-53-62.us-west-2.compute.internal
> >> (PROCESS_LOCAL)
> >> 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as
> >> 28055875 bytes in 178 ms
> >>
> >> It's been 10 minutes with no progress on relatively small data. I'll let
> >> it run overnight and update in the morning. Is there some place that I
> >> should look to see what is happening? I tried to ssh into the executor and
> >> look at /root/spark/logs but there wasn't anything informative there.
> >>
> >> I'm sure using CountByValue works fine but my use of a HashMap is only an
> >> example. In my actual task, I'm loading a Trie data structure to perform
> >> efficient string matching between a dataset of locations and strings
> >> possibly containing mentions of locations.
> >>
> >> This seems like a common thing, to process input with a relatively memory
> >> intensive object like a Trie. I hope I'm not missing something obvious. Do
> >> you know of any example code like my use case?
> >>
> >> Thanks!
> >>
> >> - jerry
> >>
> 
> 
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
> 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: spark.akka.frameSize stalls job in 1.1.0

2014-08-17 Thread Zhan Zhang
Is it because countByValue or toArray put too much stress on the driver, if 
there are many unique words 
To me it is a typical word count problem, then you can solve it as follows 
(correct me if I am wrong)

val textFile = sc.textFile(“file")
val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 
1)).reduceByKey((a, b) => a + b)
counts.saveAsTextFile(“file”)//any way you don’t want to collect results to 
master, and instead putting them in file.

Thanks.

Zhan Zhang

On Aug 16, 2014, at 9:18 AM, Jerry Ye  wrote:

> The job ended up running overnight with no progress. :-(
> 
> 
> On Sat, Aug 16, 2014 at 12:16 AM, Jerry Ye  wrote:
> 
>> Hi Xiangrui,
>> I actually tried branch-1.1 and master and it resulted in the job being
>> stuck at the TaskSetManager:
>> 14/08/16 06:55:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
>> with 2 tasks
>> 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:0 as
>> TID 2 on executor 8: ip-10-226-199-225.us-west-2.compute.internal
>> (PROCESS_LOCAL)
>> 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as
>> 28055875 bytes in 162 ms
>> 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:1 as
>> TID 3 on executor 0: ip-10-249-53-62.us-west-2.compute.internal
>> (PROCESS_LOCAL)
>> 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as
>> 28055875 bytes in 178 ms
>> 
>> It's been 10 minutes with no progress on relatively small data. I'll let
>> it run overnight and update in the morning. Is there some place that I
>> should look to see what is happening? I tried to ssh into the executor and
>> look at /root/spark/logs but there wasn't anything informative there.
>> 
>> I'm sure using CountByValue works fine but my use of a HashMap is only an
>> example. In my actual task, I'm loading a Trie data structure to perform
>> efficient string matching between a dataset of locations and strings
>> possibly containing mentions of locations.
>> 
>> This seems like a common thing, to process input with a relatively memory
>> intensive object like a Trie. I hope I'm not missing something obvious. Do
>> you know of any example code like my use case?
>> 
>> Thanks!
>> 
>> - jerry
>> 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Spark testsuite error for hive 0.13.

2014-08-12 Thread Zhan Zhang
Problem solved by a walkaround with create database and use database.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-testsuite-error-for-hive-0-13-tp7807p7819.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Spark testsuite error for hive 0.13.

2014-08-11 Thread Zhan Zhang
Thanks Sean,

I change both the API and version because there are some incompatibility
with hive-0.13, and actually can do some basic operation with the real hive
environment. But the test suite always complain with no default database
message. No clue yet.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-testsuite-error-for-hive-0-13-tp7807p7810.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Spark testsuite error for hive 0.13.

2014-08-11 Thread Zhan Zhang
I am trying to change spark to support hive-0.13, but always met following
problem when running the test. My feeling is the test setup may need to
change, but don't know exactly. Who has the similar issue or is able to shed
light on it?

13:50:53.331 ERROR org.apache.hadoop.hive.ql.Driver: FAILED:
SemanticException [Error 10072]: Database does not exist: default
org.apache.hadoop.hive.ql.parse.SemanticException: Database does not exist:
default
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getDatabase(BaseSemanticAnalyzer.java:1302)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getDatabase(BaseSemanticAnalyzer.java:1291)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9944)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:391)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:291)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:944)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1009)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:880)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:870)
at
org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:292)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:266)
at
org.apache.spark.sql.hive.test.TestHiveContext.runSqlHive(TestHive.scala:83)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:405)
at
org.apache.spark.sql.hive.test.TestHiveContext$SqlCmd$$anonfun$cmd$1.apply$mcV$sp(TestHive.scala:164)
at
org.apache.spark.sql.hive.test.TestHiveContext$$anonfun$loadTestTable$2.apply(TestHive.scala:282)
at
org.apache.spark.sql.hive.test.TestHiveContext$$anonfun$loadTestTable$2.apply(TestHive.scala:282)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
org.apache.spark.sql.hive.test.TestHiveContext.loadTestTable(TestHive.scala:282)
at
org.apache.spark.sql.hive.CachedTableSuite.(CachedTableSuite.scala:28)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:621)
at sbt.ForkMain$Run$2.call(ForkMain.java:294)
at sbt.ForkMain$Run$2.call(ForkMain.java:284)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Database does
not exist: default
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getDatabase(BaseSemanticAnalyzer.java:1298)
... 35 more



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-testsuite-error-for-hive-0-13-tp7807.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Working Formula for Hive 0.13?

2014-08-08 Thread Zhan Zhang
Attached the diff the PR SPARK-2706. I am currently working on this problem.
If somebody are also working on this, we can share the load.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p7782.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Working Formula for Hive 0.13?

2014-08-08 Thread Zhan Zhang
Sorry, forget to upload files. I have never posted before :) hive.diff

  



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Working Formula for Hive 0.13?

2014-08-08 Thread Zhan Zhang
Here is the patch. Please ignore the pom.xml related change, which just for
compiling purpose. I need to further work on this one based on Wandou's
previous work.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p7776.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Working Formula for Hive 0.13?

2014-08-08 Thread Zhan Zhang
I can compile with no error, but my patch also includes other stuff. 



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p7775.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Working Formula for Hive 0.13?

2014-08-08 Thread Zhan Zhang
The API change seems not major. I have locally change it and compiled, but
not test yet. The major problem is still how to solve the hive-exec jar
dependency. I am willing to help on this issue. Is it better stick to the
same way as hive-0.12 until hive-exec is cleaned enough to switch back?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p7774.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Spark REPL question

2014-04-17 Thread Zhan Zhang
Clear to me now.

Thanks.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-REPL-question-tp6331p6335.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: Spark REPL question

2014-04-17 Thread Zhan Zhang
Thanks a lot.

By "spins up", do you mean using the same directory, specified by following?

  /** Local directory to save .class files too */
  val outputDir = {
val tmp = System.getProperty("java.io.tmpdir")
val rootDir = new SparkConf().get("spark.repl.classdir",  tmp)
Utils.createTempDir(rootDir)
  }
val virtualDirectory  = new
PlainFile(outputDir) // "directory" for classfiles
val classServer   = new
HttpServer(outputDir) /** Jetty server that will serve our classes to
worker nodes */



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-REPL-question-tp6331p6333.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Spark REPL question

2014-04-17 Thread Zhan Zhang
Please help, I am knew to both Spark and scala. 

I am trying to figure out how spark distribute the task to workers in REPL.
I only found the place where task is serialized and sent, and workers
deserialize and load the task with the class name by ExecutorClassLoader.
But I didn't find how the driver uploaded the REPL generated .class/jar file
by REPL to file server/hdfs. My understanding is that the worker has to know
the class as well to instantiate the task.

Does anybody know where the code is (file or function name) or my
undertanding is wrong?

Thanks.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-REPL-question-tp6331.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.