parsing embedded json in spark

2016-12-21 Thread Tal Grynbaum
Hi,

I have a dataframe that contain an embedded json string in one of the fields
I'd tried to write a UDF function that will parse it using lift-json, but
it seems to take a very long time to process, and it seems that only the
master node is working.

Has anyone dealt with such a scenario before and can give me some hints?

Thanks
Tal


Re: Has anyone managed to connect to Oracle via JDBC from Spark CDH 5.5.2

2016-12-21 Thread Divya Gehlot
Hi Mich,

Can you try placing these jars in Spark Classpath.
It should work .

Thanks,
Divya

On 22 December 2016 at 05:40, Mich Talebzadeh 
wrote:

> This works with Spark 2 with Oracle jar file added to
>
> $SPARK_HOME/conf/ spark-defaults.conf
>
>
>
>
> spark.driver.extraClassPath  /home/hduser/jars/ojdbc6.jar
>
> spark.executor.extraClassPath/home/hduser/jars/ojdbc6.jar
>
>
> and you get
>
>  cala> val s = HiveContext.read.format("jdbc").options(
>  | Map("url" -> _ORACLEserver,
>  | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>  | "partitionColumn" -> "ID",
>  | "lowerBound" -> "1",
>  | "upperBound" -> "1",
>  | "numPartitions" -> "10",
>  | "user" -> _username,
>  | "password" -> _password)).load
> s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
> more fields]
> that works.
> However, with CDH 5.5.2 (Spark 1.5) it fails with error
>
> *java.sql.SQLException: No suitable driver*
>
>   at java.sql.DriverManager.getDriver(DriverManager.java:315)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.
> JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.
> JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>
>   at scala.Option.getOrElse(Option.scala:121)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.
> createConnectionFactory(JdbcUtils.scala:53)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.
> JDBCRDD$.resolveTable(JDBCRDD.scala:123)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(
> JDBCRelation.scala:117)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.
> createRelation(JdbcRelationProvider.scala:53)
>
>   at org.apache.spark.sql.execution.datasources.
> DataSource.resolveRelation(DataSource.scala:315)
>
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
>
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
>
>
> Any ideas?
>
> Thanks
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


submit spark task on yarn asynchronously via java?

2016-12-21 Thread Linyuxin
Hi All,

Version:
Spark 1.5.1
Hadoop 2.7.2

Is there any way to submit and monitor spark task on yarn via java 
asynchronously?




Access HiveConf from SparkSession

2016-12-21 Thread Vishak Baby
In Spark 1.6.2, it was possible to access the HiveConf object via the below
method.

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/hive/HiveContext.html#hiveconf()

Can anyone let me know how do the same in Spark 2.0.2, from the
SparkSession object?


Access HiveConf from SparkSession

2016-12-21 Thread Vishak
In Spark 1.6.2, it was possible to access the HiveConf object via the below
method.

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/hive/HiveContext.html#hiveconf()

Can anyone let me know how do the same in Spark 2.0.2, from the SparkSession
object?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Access-HiveConf-from-SparkSession-tp28246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



In PySpark ML, how can I interpret the SparseVector returned by a pyspark.ml.classification.RandomForestClassificationModel.featureImportances ?

2016-12-21 Thread Russell Jurney
I am debugging problems with a PySpark RandomForestClassificationModel, and
I am trying to use the feature importances to do so. However, the
featureImportances property returns a SparseVector that isn't possible to
interpret. How can I transform the SparseVector to be a useful list of
features along with feature type and name?

Some of my feature were nominal, so they had to be one-hot-encoded and then
combined with my numeric features. There is no PCA or anything that would
make interpretability hard, I just need to transform things back to where I
can get a feature type/name for each item in the SparseVector.

In other words... in practice,
RandomForestClassificationModel.featureImportances isn't useful without
some ability to make it interpretable. Does that ability exist? I've done
this in sklearn, but don't know how to do this with Spark ML.

My code is in a Jupyter Notebook on Github here
,
skip to the end.

Stack Overflow post:
http://stackoverflow.com/questions/41273893/in-pyspark-ml-how-can-i-interpret-the-sparsevector-returned-by-a-pyspark-ml-cla

Thanks!
-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


Re: Has anyone managed to connect to Oracle via JDBC from Spark CDH 5.5.2

2016-12-21 Thread Mich Talebzadeh
thanks Ayan, do you mean

"driver" -> "oracle.jdbc.OracleDriver"

we added that one but did not work!

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 21 December 2016 at 23:00, ayan guha  wrote:

> Try providing correct driver name through property variable in the jdbc
> call.
> On Thu., 22 Dec. 2016 at 8:40 am, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> This works with Spark 2 with Oracle jar file added to
>>
>>
>>
>>
>>
>> $SPARK_HOME/conf/ spark -defaults.conf
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> spark.driver.extraClassPath
>>
>> /home/hduser/jars/ojdbc6.jar
>>
>>
>>
>> spark.executor.extraClassPath
>>
>> /home/hduser/jars/ojdbc6.jar
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> and you get
>>
>>
>>
>>
>>
>>  cala> val s = HiveContext.read.format("jdbc").options(
>>  | Map("url" -> _ORACLEserver,
>>  | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
>> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
>> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>>  | "partitionColumn" -> "ID",
>>  | "lowerBound" -> "1",
>>  | "upperBound" -> "1",
>>  | "numPartitions" -> "10",
>>  | "user" -> _username,
>>  | "password" -> _password)).load
>> s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
>> more fields]
>>
>>
>>
>>
>> that works.
>> However, with CDH 5.5.2 (Spark 1.5) it fails with error
>>
>>
>>
>>
>>
>>
>>
>> *java.sql.SQLException:No suitable driver*
>>
>>
>>
>>
>>
>>
>> at java.sql.DriverManager.getDriver(DriverManager.java:315)
>>
>>
>>
>>
>>
>>
>> at org.apache.spark.sql.execution.datasources.jdbc.
>> JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>>
>>
>>
>>
>>
>>
>> at
>>
>> org.apache.spark.sql.execution.datasources.jdbc.
>> JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>>
>>
>>
>>
>>
>>
>> at scala.Option.getOrElse(Option.scala:121)
>>
>>
>>
>>
>>
>>
>> at
>>
>> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.
>> createConnectionFactory(JdbcUtils.scala:53)
>>
>>
>>
>>
>>
>>
>> at
>>
>> org.apache.spark.sql.execution.datasources.jdbc.
>> JDBCRDD$.resolveTable(JDBCRDD.scala:123)
>>
>>
>>
>>
>>
>>
>> at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(
>> JDBCRelation.scala:117)
>>
>>
>>
>>
>>
>>
>> at
>>
>> org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.
>> createRelation(JdbcRelationProvider.scala:53)
>>
>>
>>
>>
>>
>>
>> at
>>
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(
>> DataSource.scala:315)
>>
>>
>>
>>
>>
>>
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
>>
>>
>>
>>
>>
>>
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
>>
>>
>>
>>
>>
>> Any ideas?
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction
>>
>> of data or any other property which may arise from relying on this
>> email's technical content is explicitly disclaimed.
>>
>> The author will in no case be liable for any monetary damages arising
>> from such
>>
>> loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Re: Has anyone managed to connect to Oracle via JDBC from Spark CDH 5.5.2

2016-12-21 Thread ayan guha
Try providing correct driver name through property variable in the jdbc
call.
On Thu., 22 Dec. 2016 at 8:40 am, Mich Talebzadeh 
wrote:

> This works with Spark 2 with Oracle jar file added to
>
>
>
>
>
> $SPARK_HOME/conf/ spark -defaults.conf
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> spark.driver.extraClassPath
>
> /home/hduser/jars/ojdbc6.jar
>
>
>
> spark.executor.extraClassPath
>
> /home/hduser/jars/ojdbc6.jar
>
>
>
>
>
>
>
>
>
> and you get
>
>
>
>
>
>  cala> val s = HiveContext.read.format("jdbc").options(
>  | Map("url" -> _ORACLEserver,
>  | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>  | "partitionColumn" -> "ID",
>  | "lowerBound" -> "1",
>  | "upperBound" -> "1",
>  | "numPartitions" -> "10",
>  | "user" -> _username,
>  | "password" -> _password)).load
> s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
> more fields]
>
>
>
>
> that works.
> However, with CDH 5.5.2 (Spark 1.5) it fails with error
>
>
>
>
>
>
>
> *java.sql.SQLException:No suitable driver*
>
>
>
>
>
>
> at java.sql.DriverManager.getDriver(DriverManager.java:315)
>
>
>
>
>
>
> at
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>
>
>
>
>
>
> at scala.Option.getOrElse(Option.scala:121)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:53)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123)
>
>
>
>
>
>
> at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:117)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:53)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:315)
>
>
>
>
>
>
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
>
>
>
>
>
>
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
>
>
>
>
>
> Any ideas?
>
> Thanks
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
>
>
>
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
>
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction
>
> of data or any other property which may arise from relying on this
> email's technical content is explicitly disclaimed.
>
> The author will in no case be liable for any monetary damages arising from
> such
>
> loss, damage or destruction.
>
>
>
>
>
>
>
>
>
>
>


Has anyone managed to connect to Oracle via JDBC from Spark CDH 5.5.2

2016-12-21 Thread Mich Talebzadeh
This works with Spark 2 with Oracle jar file added to

$SPARK_HOME/conf/ spark-defaults.conf




spark.driver.extraClassPath  /home/hduser/jars/ojdbc6.jar

spark.executor.extraClassPath/home/hduser/jars/ojdbc6.jar


and you get

 cala> val s = HiveContext.read.format("jdbc").options(
 | Map("url" -> _ORACLEserver,
 | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
 | "partitionColumn" -> "ID",
 | "lowerBound" -> "1",
 | "upperBound" -> "1",
 | "numPartitions" -> "10",
 | "user" -> _username,
 | "password" -> _password)).load
s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
more fields]
that works.
However, with CDH 5.5.2 (Spark 1.5) it fails with error

*java.sql.SQLException: No suitable driver*

  at java.sql.DriverManager.getDriver(DriverManager.java:315)

  at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)

  at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)

  at scala.Option.getOrElse(Option.scala:121)

  at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:53)

  at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123)

  at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:117)

  at
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:53)

  at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:315)

  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)

  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)


Any ideas?

Thanks




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: SPARK -SQL Understanding BroadcastNestedLoopJoin and number of partitions

2016-12-21 Thread David Hodeffi
Do you know who can I talk to about this code? I am rally curious to know why 
there is a join and why number of partition for join is the sum of both of 
them, I expected to see that number of partitions should be the same as the 
streamed table ,or  worst case multiplied.

Sent from my iPhone

On Dec 21, 2016, at 14:43, David Hodeffi 
mailto:david.hode...@niceactimize.com>> wrote:


I have two dataframes which I am joining. small and big size dataframess. The 
optimizer suggest to use BroadcastNestedLoopJoin.
number of partitions for the big Dataframe is 200 while small Dataframe has 5 
partitions.
The joined dataframe results with 205 partitions (joined.rdd.partitions.size), 
I have tried to understand why is this number and figured out that 
BroadCastNestedLoopJoin is actually a union.
code :
case class BroadcastNestedLoopJoin{
def doExecuteo(): =
{ ... ... sparkContext.union( matchedStreamRows, 
sparkContext.makeRDD(notMatchedBroadcastRows) ) }
}
can someone please explain what exactly the code of doExecute() do? can you 
elaborate about all the null checks and why can we have nulls ? Why do we have 
205 partitions? link to a JIRA with discussion that can explain the code can 
help.


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark-shell fails to redefine values

2016-12-21 Thread Yang
summary: Spark-shell fails to redefine values in some cases, this is at
least found in a case where "implicit" is involved, but not limited to such
cases

run the following in spark-shell, u can see that the last redefinition does
not take effect. the same code runs in plain scala REPL without problems

scala> class Useless{}
defined class Useless

scala> class Useless1 {}
defined class Useless1

scala> implicit val eee :Useless = new Useless()
eee: Useless = Useless@2c6beb3e

scala> implicit val eee:Useless1 = new Useless1()
eee: Useless1 = Useless1@66cb003

scala> eee
res24: Useless = Useless@1ec5bf62


spark linear regression error training dataset is empty

2016-12-21 Thread Xiaomeng Wan
Hi,

I am running linear regression on a dataframe and get the following error:

Exception in thread "main" java.lang.AssertionError: assertion failed:
Training dataset is empty.

at scala.Predef$.assert(Predef.scala:170)

at
org.apache.spark.ml.optim.WeightedLeastSquares$Aggregator.validate(WeightedLeastSquares.scala:247)

at
org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:82)

at
org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:180)

at
org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:70)

at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)

here is the data and code:

{"label":79.3,"features":{"type":1,"values":[6412.14350001,888.0,1407.0,1.5844594594594594,10.614,12.07,0.12062966031483012,0.9991237664152219,6.065,0.49751449875724935]}}

{"label":72.3,"features":{"type":1,"values":[6306.04450001,1084.0,1451.0,1.338560885608856,7.018,12.04,0.41710963455149497,0.9992054343916128,6.05,0.4975083056478405]}}

{"label":76.7,"features":{"type":1,"values":[6142.9203,1494.0,1437.0,0.9618473895582329,7.939,12.06,0.34170812603648426,0.9992216101762574,6.06,0.49751243781094534]}}

val lr = new LinearRegression().setMaxIter(300).setFeaturesCol("features")

val lrModel = lr.fit(assembleddata)

Any clue or inputs are appreciated.


Regards,

Shawn


Re: Parquet with group by queries

2016-12-21 Thread Anil Langote
I tried caching the parent data set but it slows down the execution time, last 
column in the input data set is double array and requirement is to add last 
column double array after doing group by. I have implemented an aggregation 
function which adds the last column. Hence the query is 

Select count(*), col1, col2, col3, aggregationFunction(doublecol) from table 
group by col1,col2,col3 having count(*) >1

The about queries group by columns will change similarly I have to run 100 
queries on same data set.

Best Regards,
Anil Langote
+1-425-633-9747

> On Dec 21, 2016, at 11:41 AM, Anil Langote  wrote:
> 
> Hi All,
> 
> I have an requirement where I have to run 100 group by queries with different 
> columns I have generated the parquet file which has 30 columns I see every 
> parquet files has different size and 200 files are generated, my question is 
> what is the best approach to run group by queries on parquet files more files 
> are recommend or I should create less files to get better performance.  
> 
> Right now with 2 cores and 65 executors on 4 node cluster with 320 cores 
> available spark is taking average 1.4 mins to finish one query we want to 
> tune the time around 30 or 40 seconds for one query the hdfs block size 128MB 
> and spark is launching 2400 tasks the partitions for the input dataset is 
> 2252.
> 
> I have implemented the threading in spark driver to launch all these queries 
> at the same time with fair scheduled enabled however I see most of times jobs 
> are running sequentially.
> 
> Any input in this regard is appreciated.
> 
> Best Regards,
> Anil Langote
> +1-425-633-9747


Parquet with group by queries

2016-12-21 Thread Anil Langote
Hi All,

I have an requirement where I have to run 100 group by queries with different 
columns I have generated the parquet file which has 30 columns I see every 
parquet files has different size and 200 files are generated, my question is 
what is the best approach to run group by queries on parquet files more files 
are recommend or I should create less files to get better performance.  

Right now with 2 cores and 65 executors on 4 node cluster with 320 cores 
available spark is taking average 1.4 mins to finish one query we want to tune 
the time around 30 or 40 seconds for one query the hdfs block size 128MB and 
spark is launching 2400 tasks the partitions for the input dataset is 2252.

I have implemented the threading in spark driver to launch all these queries at 
the same time with fair scheduled enabled however I see most of times jobs are 
running sequentially.

Any input in this regard is appreciated.

Best Regards,
Anil Langote
+1-425-633-9747

Re: Approach: Incremental data load from HBASE

2016-12-21 Thread Ted Yu
Incremental load traditionally means generating hfiles and
using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the
data into hbase.

For your use case, the producer needs to find rows where the flag is 0 or 1.
After such rows are obtained, it is up to you how the result of processing
is delivered to hbase.

Cheers

On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri 
wrote:

> Ok, Sure will ask.
>
> But what would be generic best practice solution for Incremental load from
> HBASE.
>
> On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu  wrote:
>
>> I haven't used Gobblin.
>> You can consider asking Gobblin mailing list of the first option.
>>
>> The second option would work.
>>
>>
>> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Guys,
>>>
>>> I would like to understand different approach for Distributed
>>> Incremental load from HBase, Is there any *tool / incubactor tool* which
>>> satisfy requirement ?
>>>
>>> *Approach 1:*
>>>
>>> Write Kafka Producer and maintain manually column flag for events and
>>> ingest it with Linkedin Gobblin to HDFS / S3.
>>>
>>> *Approach 2:*
>>>
>>> Run Scheduled Spark Job - Read from HBase and do transformations and
>>> maintain flag column at HBase Level.
>>>
>>> In above both approach, I need to maintain column level flags. such as 0
>>> - by default, 1-sent,2-sent and acknowledged. So next time Producer will
>>> take another 1000 rows of batch where flag is 0 or 1.
>>>
>>> I am looking for best practice approach with any distributed tool.
>>>
>>> Thanks.
>>>
>>> - Chetan Khatri
>>>
>>
>>
>


Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Vadim Semenov
Check the source code for SparkLauncher:
https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java#L541

a separate process will be started using `spark-submit` and if it uses
`yarn-cluster` mode, a driver may be launched on another NodeManager or may
be launched on the same NodeManager, so you would need to work around it if
you want to avoid hot spots.

On Wed, Dec 21, 2016 at 8:19 AM, Naveen  wrote:

> Thanks Liang!
> I get your point. It would mean that when launching spark jobs, mode needs
> to be specified as client for all spark jobs.
> However, my concern is to know if driver's memory(which is launching spark
> jobs) will be used completely by the Future's(sparkcontext's) or these
> spawned sparkcontexts will get different nodes / executors from resource
> manager?
>
> On Wed, Dec 21, 2016 at 6:43 PM, Naveen  wrote:
>
>> Hi Sebastian,
>>
>> Yes, for fetching the details from Hive and HBase, I would want to use
>> Spark's HiveContext etc.
>> However, based on your point, I might have to check if JDBC based driver
>> connection could be used to do the same.
>>
>> Main reason for this is to avoid a client-server architecture design.
>>
>> If we go by a normal scala app without creating a sparkcontext as per
>> your suggestion, then
>> 1. it turns out to be a client program on cluster on a single node, and
>> for any multiple invocation through xyz scheduler , it will be invoked
>> always from that same node
>> 2. Having client program on a single data node might create a hotspot for
>> that data node which might create a bottleneck as all invocations might
>> create JVMs on that node itself.
>> 3. With above, we will loose the Spark on YARN's feature of dynamically
>> allocating a driver on any available data node through RM and NM
>> co-ordination. With YARN and Cluster mode of invoking a spark-job, it will
>> help distribute multiple application(main one) in cluster uniformly.
>>
>> Thanks and please let me know your views.
>>
>>
>> On Wed, Dec 21, 2016 at 5:43 PM, Sebastian Piu 
>> wrote:
>>
>>> Is there any reason you need a context on the application launching the
>>> jobs?
>>> You can use SparkLauncher in a normal app and just listen for state
>>> transitions
>>>
>>> On Wed, 21 Dec 2016, 11:44 Naveen,  wrote:
>>>
 Hi Team,

 Thanks for your responses.
 Let me give more details in a picture of how I am trying to launch jobs.

 Main spark job will launch other spark-job similar to calling multiple
 spark-submit within a Spark driver program.
 These spawned threads for new jobs will be totally different
 components, so these cannot be implemented using spark actions.

 sample code:

 -

 Object Mainsparkjob {

 main(...){

 val sc=new SparkContext(..)

 Fetch from hive..using hivecontext
 Fetch from hbase

 //spawning multiple Futures..
 Val future1=Future{
 Val sparkjob= SparkLauncher(...).launch; spark.waitFor
 }

 Similarly, future2 to futureN.

 future1.onComplete{...}
 }

 }// end of mainsparkjob
 --


 [image: Inline image 1]

 On Wed, Dec 21, 2016 at 3:13 PM, David Hodeffi <
 david.hode...@niceactimize.com> wrote:

 I am not familiar of any problem with that.

 Anyway, If you run spark applicaction you would have multiple jobs,
 which makes sense that it is not a problem.



 Thanks David.



 *From:* Naveen [mailto:hadoopst...@gmail.com]
 *Sent:* Wednesday, December 21, 2016 9:18 AM
 *To:* d...@spark.apache.org; user@spark.apache.org
 *Subject:* Launching multiple spark jobs within a main spark job.



 Hi Team,



 Is it ok to spawn multiple spark jobs within a main spark job, my main
 spark job's driver which was launched on yarn cluster, will do some
 preprocessing and based on it, it needs to launch multilple spark jobs on
 yarn cluster. Not sure if this right pattern.



 Please share your thoughts.

 Sample code i ve is as below for better understanding..

 -



 Object Mainsparkjob {



 main(...){



 val sc=new SparkContext(..)



 Fetch from hive..using hivecontext

 Fetch from hbase



 //spawning multiple Futures..

 Val future1=Future{

 Val sparkjob= SparkLauncher(...).launch; spark.waitFor

 }



 Similarly, future2 to futureN.



 future1.onComplete{...}

 }



 }// end of mainsparkjob

 --


 Confidentiality: This communication and any attachments are intended
 for the above-named persons only and may be confidential and/or legally
 privileged. Any opi

Re: Spark kryo serialization register Datatype[]

2016-12-21 Thread Georg Heiler
I already set

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

to enable kryo and

.set("spark.kryo.registrationRequired", "true")

to force kryo. Strangely, I see the issue of this missing Dataset[]

Trying to register regular classes like Date

.registerKryoClasses(Array(classOf[Date]))

works just fine. but registering the spark internal Dataset[] is not
working / as far as I read the docs should be handled by spark.


Vadim Semenov  schrieb am Mi., 21. Dez. 2016
um 17:12 Uhr:

> to enable kryo serializer you just need to pass
> `spark.serializer=org.apache.spark.serializer.KryoSerializer`
>
> the `spark.kryo.registrationRequired` controls the following behavior:
>
> Whether to require registration with Kryo. If set to 'true', Kryo will
> throw an exception if an unregistered class is serialized. If set to false
> (the default), Kryo will write unregistered class names along with each
> object. Writing class names can cause significant performance overhead, so
> enabling this option can enforce strictly that a user has not omitted
> classes from registration.
>
>
> as described here http://spark.apache.org/docs/latest/configuration.html
>
> if it's set to `true` you need to manually register classes as described
> here: http://spark.apache.org/docs/latest/tuning.html#data-serialization
>
>
> On Wed, Dec 21, 2016 at 8:49 AM, geoHeil 
> wrote:
>
> To force spark to use kryo serialization I set
> spark.kryo.registrationRequired to true.
>
> Now spark complains that: Class is not registered:
> org.apache.spark.sql.types.DataType[] is not registered.
> How can I fix this? So far I could not successfully register this class.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-kryo-serialization-register-Datatype-tp28243.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: NoClassDefFoundError

2016-12-21 Thread Vadim Semenov
You better ask folks in the spark-jobserver gitter channel:
https://github.com/spark-jobserver/spark-jobserver

On Wed, Dec 21, 2016 at 8:02 AM, Reza zade  wrote:

> Hello
>
> I've extended the JavaSparkJob (job-server-0.6.2) and created an object
> of SQLContext class. my maven project doesn't have any problem during
> compile and packaging phase. but when I send .jar of project to sjs and run
> it "NoClassDefFoundError" will be issued. the trace of exception is :
>
>
> job-server[ERROR] Exception in thread "pool-20-thread-1"
> java.lang.NoClassDefFoundError: org/apache/spark/sql/SQLContext
> job-server[ERROR]  at sparkdesk.SparkSQLJob2.runJob(SparkSQLJob2.java:61)
> job-server[ERROR]  at sparkdesk.SparkSQLJob2.runJob(SparkSQLJob2.java:45)
> job-server[ERROR]  at spark.jobserver.JavaSparkJob.r
> unJob(JavaSparkJob.scala:17)
> job-server[ERROR]  at spark.jobserver.JavaSparkJob.r
> unJob(JavaSparkJob.scala:14)
> job-server[ERROR]  at spark.jobserver.JobManagerActo
> r$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.
> apply(JobManagerActor.scala:301)
> job-server[ERROR]  at scala.concurrent.impl.Future$P
> romiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> job-server[ERROR]  at scala.concurrent.impl.Future$P
> romiseCompletingRunnable.run(Future.scala:24)
> job-server[ERROR]  at java.util.concurrent.ThreadPoo
> lExecutor.runWorker(ThreadPoolExecutor.java:1145)
> job-server[ERROR]  at java.util.concurrent.ThreadPoo
> lExecutor$Worker.run(ThreadPoolExecutor.java:615)
> job-server[ERROR]  at java.lang.Thread.run(Thread.java:745)
> job-server[ERROR] Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.sql.SQLContext
> job-server[ERROR]  at java.net.URLClassLoader$1.run(
> URLClassLoader.java:366)
> job-server[ERROR]  at java.net.URLClassLoader$1.run(
> URLClassLoader.java:355)
> job-server[ERROR]  at java.security.AccessController.doPrivileged(Native
> Method)
> job-server[ERROR]  at java.net.URLClassLoader.findCl
> ass(URLClassLoader.java:354)
> job-server[ERROR]  at java.lang.ClassLoader.loadClas
> s(ClassLoader.java:425)
> job-server[ERROR]  at java.lang.ClassLoader.loadClas
> s(ClassLoader.java:358)
> job-server[ERROR]  ... 10 more
>
>
> what is the problem?
> do you have any solution about this?
>


Re: Spark kryo serialization register Datatype[]

2016-12-21 Thread Vadim Semenov
to enable kryo serializer you just need to pass
`spark.serializer=org.apache.spark.serializer.KryoSerializer`

the `spark.kryo.registrationRequired` controls the following behavior:

Whether to require registration with Kryo. If set to 'true', Kryo will
> throw an exception if an unregistered class is serialized. If set to false
> (the default), Kryo will write unregistered class names along with each
> object. Writing class names can cause significant performance overhead, so
> enabling this option can enforce strictly that a user has not omitted
> classes from registration.


as described here http://spark.apache.org/docs/latest/configuration.html

if it's set to `true` you need to manually register classes as described
here: http://spark.apache.org/docs/latest/tuning.html#data-serialization


On Wed, Dec 21, 2016 at 8:49 AM, geoHeil  wrote:

> To force spark to use kryo serialization I set
> spark.kryo.registrationRequired to true.
>
> Now spark complains that: Class is not registered:
> org.apache.spark.sql.types.DataType[] is not registered.
> How can I fix this? So far I could not successfully register this class.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-kryo-serialization-register-
> Datatype-tp28243.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: ML PIC

2016-12-21 Thread Robert Hamilton
Thank you Nick that is good to know.

Would this have some opportunity for newbs (like me) to volunteer some time?

Sent from my iPhone

> On Dec 21, 2016, at 9:08 AM, Nick Pentreath  wrote:
> 
> It is part of the general feature parity roadmap. I can't recall offhand any 
> blocker reasons it's just resources 
>> On Wed, 21 Dec 2016 at 17:05, Robert Hamilton  
>> wrote:
>> Hi all.  Is it on the roadmap to have an 
>> Spark.ml.clustering.PowerIterationClustering? Are there technical reasons 
>> that there is currently only an .mllib version?
>> 
>> 
>> Sent from my iPhone
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


Re: Approach: Incremental data load from HBASE

2016-12-21 Thread Chetan Khatri
Ok, Sure will ask.

But what would be generic best practice solution for Incremental load from
HBASE.

On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu  wrote:

> I haven't used Gobblin.
> You can consider asking Gobblin mailing list of the first option.
>
> The second option would work.
>
>
> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Guys,
>>
>> I would like to understand different approach for Distributed Incremental
>> load from HBase, Is there any *tool / incubactor tool* which satisfy
>> requirement ?
>>
>> *Approach 1:*
>>
>> Write Kafka Producer and maintain manually column flag for events and
>> ingest it with Linkedin Gobblin to HDFS / S3.
>>
>> *Approach 2:*
>>
>> Run Scheduled Spark Job - Read from HBase and do transformations and
>> maintain flag column at HBase Level.
>>
>> In above both approach, I need to maintain column level flags. such as 0
>> - by default, 1-sent,2-sent and acknowledged. So next time Producer will
>> take another 1000 rows of batch where flag is 0 or 1.
>>
>> I am looking for best practice approach with any distributed tool.
>>
>> Thanks.
>>
>> - Chetan Khatri
>>
>
>


Re: ML PIC

2016-12-21 Thread Yanbo Liang
You can track https://issues.apache.org/jira/browse/SPARK-15784 for the
progress.

On Wed, Dec 21, 2016 at 7:08 AM, Nick Pentreath 
wrote:

> It is part of the general feature parity roadmap. I can't recall offhand
> any blocker reasons it's just resources
> On Wed, 21 Dec 2016 at 17:05, Robert Hamilton <
> robert_b_hamil...@icloud.com> wrote:
>
>> Hi all.  Is it on the roadmap to have an 
>> Spark.ml.clustering.PowerIterationClustering?
>> Are there technical reasons that there is currently only an .mllib version?
>>
>>
>> Sent from my iPhone
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


streaming performance

2016-12-21 Thread Mendelson, Assaf
am having trouble with streaming performance. My main problem is how to do a 
sliding window calculation where the ratio between the window size and the step 
size is relatively large (hundreds) without recalculating everything all the 
time.

I created a simple example of what I am aiming at with what I have so far which 
is detailed in 
http://stackoverflow.com/questions/41266956/apache-spark-streaming-performance

I was hoping someone can point me to what I am doing wrong.

Thanks,

Assaf.


Re: [ On the use of Spark as 'storage system']

2016-12-21 Thread Sean Owen
Spark isn't a storage system -- it's a batch processing system at heart. To
"serve" something means to run a distributed computation scanning
partitions for an element and collect it to a driver and return it.
Although that could be fast-enough for some definition of fast, it's going
to be orders of magnitude slower than using a technology for point lookups
of data -- NoSQL stores. Consider that if you want to sustain, say,
1000qps, that's 1000 Spark jobs launching N tasks per second. It isn't
designed for that.

On Wed, Dec 21, 2016 at 3:29 PM Enrico DUrso 
wrote:

> Hello,
>
> I had a discussion today with a colleague who was saying the following:
> “We can use Spark as fast serving layer in our architecture, that is we
> can compute an RDD or even a dataset using Spark SQL,
> then we can cache it  and offering to the front end layer an access to our
> application in order to show them the content of the RDD/Content.”
>
> This way of using Spark is for me something new, has anyone of you
> experience in this use case?
>
> Cheers,
>
> Enrico
>
> --
>
> CONFIDENTIALITY WARNING.
> This message and the information contained in or attached to it are
> private and confidential and intended exclusively for the addressee. everis
> informs to whom it may receive it in error that it contains privileged
> information and its use, copy, reproduction or distribution is prohibited.
> If you are not an intended recipient of this E-mail, please notify the
> sender, delete it and do not read, act upon, print, disclose, copy, retain
> or redistribute any portion of this E-mail.
>


[ On the use of Spark as 'storage system']

2016-12-21 Thread Enrico DUrso
Hello,

I had a discussion today with a colleague who was saying the following:
"We can use Spark as fast serving layer in our architecture, that is we can 
compute an RDD or even a dataset using Spark SQL,
then we can cache it  and offering to the front end layer an access to our 
application in order to show them the content of the RDD/Content."

This way of using Spark is for me something new, has anyone of you experience 
in this use case?

Cheers,

Enrico



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


Re: Approach: Incremental data load from HBASE

2016-12-21 Thread Ted Yu
I haven't used Gobblin.
You can consider asking Gobblin mailing list of the first option.

The second option would work.


On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri 
wrote:

> Hello Guys,
>
> I would like to understand different approach for Distributed Incremental
> load from HBase, Is there any *tool / incubactor tool* which satisfy
> requirement ?
>
> *Approach 1:*
>
> Write Kafka Producer and maintain manually column flag for events and
> ingest it with Linkedin Gobblin to HDFS / S3.
>
> *Approach 2:*
>
> Run Scheduled Spark Job - Read from HBase and do transformations and
> maintain flag column at HBase Level.
>
> In above both approach, I need to maintain column level flags. such as 0 -
> by default, 1-sent,2-sent and acknowledged. So next time Producer will take
> another 1000 rows of batch where flag is 0 or 1.
>
> I am looking for best practice approach with any distributed tool.
>
> Thanks.
>
> - Chetan Khatri
>


Re: ML PIC

2016-12-21 Thread Nick Pentreath
It is part of the general feature parity roadmap. I can't recall offhand
any blocker reasons it's just resources
On Wed, 21 Dec 2016 at 17:05, Robert Hamilton 
wrote:

> Hi all.  Is it on the roadmap to have an
> Spark.ml.clustering.PowerIterationClustering? Are there technical reasons
> that there is currently only an .mllib version?
>
>
> Sent from my iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


ML PIC

2016-12-21 Thread Robert Hamilton
Hi all.  Is it on the roadmap to have an 
Spark.ml.clustering.PowerIterationClustering? Are there technical reasons that 
there is currently only an .mllib version?


Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark kryo serialization register Datatype[]

2016-12-21 Thread geoHeil
To force spark to use kryo serialization I set
spark.kryo.registrationRequired to true.

Now spark complains that: Class is not registered:
org.apache.spark.sql.types.DataType[] is not registered.
How can I fix this? So far I could not successfully register this class.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-kryo-serialization-register-Datatype-tp28243.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Writing into parquet throws Array out of bounds exception

2016-12-21 Thread Selvam Raman
Hi,

When i am trying to write dataset to parquet or to show(1,fase), my job
throws array out of bounce exception.

16/12/21 12:38:50 WARN TaskSetManager: Lost task 7.0 in stage 36.0 (TID 81,
ip-10-95-36-69.dev): java.lang.ArrayIndexOutOfBoundsException: 63

at
org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:156)

at org.apache.spark.unsafe.types.UTF8String.indexOf(UTF8String.java:565)

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)

at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


16/12/21 12:38:50 INFO TaskSetManager: Starting task 7.1 in stage 36.0 (TID
106, ip-10-95-36-70.dev, partition 7, RACK_LOCAL, 6020 bytes)

16/12/21 12:38:50 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching
task 106 on executor id: 4 hostname: ip-10-95-36-70.dev.

16/12/21 12:38:50 WARN TaskSetManager: Lost task 4.0 in stage 36.0 (TID 78,
ip-10-95-36-70.dev): java.lang.ArrayIndexOutOfBoundsException: 62

at
org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:156)

at org.apache.spark.unsafe.types.UTF8String.indexOf(UTF8String.java:565)

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)

at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


In my dataset there is one column which is longblob, if i convert to
unbase64. I face this problem. i could able to write to parquet without
conversion.


So is there some limit for bytes per line?. Please give me your suggestion.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Naveen
Thanks Liang!
I get your point. It would mean that when launching spark jobs, mode needs
to be specified as client for all spark jobs.
However, my concern is to know if driver's memory(which is launching spark
jobs) will be used completely by the Future's(sparkcontext's) or these
spawned sparkcontexts will get different nodes / executors from resource
manager?

On Wed, Dec 21, 2016 at 6:43 PM, Naveen  wrote:

> Hi Sebastian,
>
> Yes, for fetching the details from Hive and HBase, I would want to use
> Spark's HiveContext etc.
> However, based on your point, I might have to check if JDBC based driver
> connection could be used to do the same.
>
> Main reason for this is to avoid a client-server architecture design.
>
> If we go by a normal scala app without creating a sparkcontext as per your
> suggestion, then
> 1. it turns out to be a client program on cluster on a single node, and
> for any multiple invocation through xyz scheduler , it will be invoked
> always from that same node
> 2. Having client program on a single data node might create a hotspot for
> that data node which might create a bottleneck as all invocations might
> create JVMs on that node itself.
> 3. With above, we will loose the Spark on YARN's feature of dynamically
> allocating a driver on any available data node through RM and NM
> co-ordination. With YARN and Cluster mode of invoking a spark-job, it will
> help distribute multiple application(main one) in cluster uniformly.
>
> Thanks and please let me know your views.
>
>
> On Wed, Dec 21, 2016 at 5:43 PM, Sebastian Piu 
> wrote:
>
>> Is there any reason you need a context on the application launching the
>> jobs?
>> You can use SparkLauncher in a normal app and just listen for state
>> transitions
>>
>> On Wed, 21 Dec 2016, 11:44 Naveen,  wrote:
>>
>>> Hi Team,
>>>
>>> Thanks for your responses.
>>> Let me give more details in a picture of how I am trying to launch jobs.
>>>
>>> Main spark job will launch other spark-job similar to calling multiple
>>> spark-submit within a Spark driver program.
>>> These spawned threads for new jobs will be totally different components,
>>> so these cannot be implemented using spark actions.
>>>
>>> sample code:
>>>
>>> -
>>>
>>> Object Mainsparkjob {
>>>
>>> main(...){
>>>
>>> val sc=new SparkContext(..)
>>>
>>> Fetch from hive..using hivecontext
>>> Fetch from hbase
>>>
>>> //spawning multiple Futures..
>>> Val future1=Future{
>>> Val sparkjob= SparkLauncher(...).launch; spark.waitFor
>>> }
>>>
>>> Similarly, future2 to futureN.
>>>
>>> future1.onComplete{...}
>>> }
>>>
>>> }// end of mainsparkjob
>>> --
>>>
>>>
>>> [image: Inline image 1]
>>>
>>> On Wed, Dec 21, 2016 at 3:13 PM, David Hodeffi <
>>> david.hode...@niceactimize.com> wrote:
>>>
>>> I am not familiar of any problem with that.
>>>
>>> Anyway, If you run spark applicaction you would have multiple jobs,
>>> which makes sense that it is not a problem.
>>>
>>>
>>>
>>> Thanks David.
>>>
>>>
>>>
>>> *From:* Naveen [mailto:hadoopst...@gmail.com]
>>> *Sent:* Wednesday, December 21, 2016 9:18 AM
>>> *To:* d...@spark.apache.org; user@spark.apache.org
>>> *Subject:* Launching multiple spark jobs within a main spark job.
>>>
>>>
>>>
>>> Hi Team,
>>>
>>>
>>>
>>> Is it ok to spawn multiple spark jobs within a main spark job, my main
>>> spark job's driver which was launched on yarn cluster, will do some
>>> preprocessing and based on it, it needs to launch multilple spark jobs on
>>> yarn cluster. Not sure if this right pattern.
>>>
>>>
>>>
>>> Please share your thoughts.
>>>
>>> Sample code i ve is as below for better understanding..
>>>
>>> -
>>>
>>>
>>>
>>> Object Mainsparkjob {
>>>
>>>
>>>
>>> main(...){
>>>
>>>
>>>
>>> val sc=new SparkContext(..)
>>>
>>>
>>>
>>> Fetch from hive..using hivecontext
>>>
>>> Fetch from hbase
>>>
>>>
>>>
>>> //spawning multiple Futures..
>>>
>>> Val future1=Future{
>>>
>>> Val sparkjob= SparkLauncher(...).launch; spark.waitFor
>>>
>>> }
>>>
>>>
>>>
>>> Similarly, future2 to futureN.
>>>
>>>
>>>
>>> future1.onComplete{...}
>>>
>>> }
>>>
>>>
>>>
>>> }// end of mainsparkjob
>>>
>>> --
>>>
>>>
>>> Confidentiality: This communication and any attachments are intended for
>>> the above-named persons only and may be confidential and/or legally
>>> privileged. Any opinions expressed in this communication are not
>>> necessarily those of NICE Actimize. If this communication has come to you
>>> in error you must take no action based on it, nor must you copy or show it
>>> to anyone; please delete/destroy and inform the sender by e-mail
>>> immediately.
>>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>>> Viruses: Although we have taken steps toward ensuring that this e-mail
>>> and attachments are free from any virus, we advise that in keeping with
>>> good computing practice the recipient should ensure they are actually virus
>>> free.
>>>
>>>
>>>
>


Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Naveen
Hi Sebastian,

Yes, for fetching the details from Hive and HBase, I would want to use
Spark's HiveContext etc.
However, based on your point, I might have to check if JDBC based driver
connection could be used to do the same.

Main reason for this is to avoid a client-server architecture design.

If we go by a normal scala app without creating a sparkcontext as per your
suggestion, then
1. it turns out to be a client program on cluster on a single node, and for
any multiple invocation through xyz scheduler , it will be invoked always
from that same node
2. Having client program on a single data node might create a hotspot for
that data node which might create a bottleneck as all invocations might
create JVMs on that node itself.
3. With above, we will loose the Spark on YARN's feature of dynamically
allocating a driver on any available data node through RM and NM
co-ordination. With YARN and Cluster mode of invoking a spark-job, it will
help distribute multiple application(main one) in cluster uniformly.

Thanks and please let me know your views.


On Wed, Dec 21, 2016 at 5:43 PM, Sebastian Piu 
wrote:

> Is there any reason you need a context on the application launching the
> jobs?
> You can use SparkLauncher in a normal app and just listen for state
> transitions
>
> On Wed, 21 Dec 2016, 11:44 Naveen,  wrote:
>
>> Hi Team,
>>
>> Thanks for your responses.
>> Let me give more details in a picture of how I am trying to launch jobs.
>>
>> Main spark job will launch other spark-job similar to calling multiple
>> spark-submit within a Spark driver program.
>> These spawned threads for new jobs will be totally different components,
>> so these cannot be implemented using spark actions.
>>
>> sample code:
>>
>> -
>>
>> Object Mainsparkjob {
>>
>> main(...){
>>
>> val sc=new SparkContext(..)
>>
>> Fetch from hive..using hivecontext
>> Fetch from hbase
>>
>> //spawning multiple Futures..
>> Val future1=Future{
>> Val sparkjob= SparkLauncher(...).launch; spark.waitFor
>> }
>>
>> Similarly, future2 to futureN.
>>
>> future1.onComplete{...}
>> }
>>
>> }// end of mainsparkjob
>> --
>>
>>
>> [image: Inline image 1]
>>
>> On Wed, Dec 21, 2016 at 3:13 PM, David Hodeffi <
>> david.hode...@niceactimize.com> wrote:
>>
>> I am not familiar of any problem with that.
>>
>> Anyway, If you run spark applicaction you would have multiple jobs, which
>> makes sense that it is not a problem.
>>
>>
>>
>> Thanks David.
>>
>>
>>
>> *From:* Naveen [mailto:hadoopst...@gmail.com]
>> *Sent:* Wednesday, December 21, 2016 9:18 AM
>> *To:* d...@spark.apache.org; user@spark.apache.org
>> *Subject:* Launching multiple spark jobs within a main spark job.
>>
>>
>>
>> Hi Team,
>>
>>
>>
>> Is it ok to spawn multiple spark jobs within a main spark job, my main
>> spark job's driver which was launched on yarn cluster, will do some
>> preprocessing and based on it, it needs to launch multilple spark jobs on
>> yarn cluster. Not sure if this right pattern.
>>
>>
>>
>> Please share your thoughts.
>>
>> Sample code i ve is as below for better understanding..
>>
>> -
>>
>>
>>
>> Object Mainsparkjob {
>>
>>
>>
>> main(...){
>>
>>
>>
>> val sc=new SparkContext(..)
>>
>>
>>
>> Fetch from hive..using hivecontext
>>
>> Fetch from hbase
>>
>>
>>
>> //spawning multiple Futures..
>>
>> Val future1=Future{
>>
>> Val sparkjob= SparkLauncher(...).launch; spark.waitFor
>>
>> }
>>
>>
>>
>> Similarly, future2 to futureN.
>>
>>
>>
>> future1.onComplete{...}
>>
>> }
>>
>>
>>
>> }// end of mainsparkjob
>>
>> --
>>
>>
>> Confidentiality: This communication and any attachments are intended for
>> the above-named persons only and may be confidential and/or legally
>> privileged. Any opinions expressed in this communication are not
>> necessarily those of NICE Actimize. If this communication has come to you
>> in error you must take no action based on it, nor must you copy or show it
>> to anyone; please delete/destroy and inform the sender by e-mail
>> immediately.
>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>> Viruses: Although we have taken steps toward ensuring that this e-mail
>> and attachments are free from any virus, we advise that in keeping with
>> good computing practice the recipient should ensure they are actually virus
>> free.
>>
>>
>>


NoClassDefFoundError

2016-12-21 Thread Reza zade
Hello

I've extended the JavaSparkJob (job-server-0.6.2) and created an object
of SQLContext class. my maven project doesn't have any problem during
compile and packaging phase. but when I send .jar of project to sjs and run
it "NoClassDefFoundError" will be issued. the trace of exception is :


job-server[ERROR] Exception in thread "pool-20-thread-1"
java.lang.NoClassDefFoundError:
org/apache/spark/sql/SQLContext
job-server[ERROR]  at sparkdesk.SparkSQLJob2.runJob(SparkSQLJob2.java:61)
job-server[ERROR]  at sparkdesk.SparkSQLJob2.runJob(SparkSQLJob2.java:45)
job-server[ERROR]  at spark.jobserver.JavaSparkJob.
runJob(JavaSparkJob.scala:17)
job-server[ERROR]  at spark.jobserver.JavaSparkJob.
runJob(JavaSparkJob.scala:14)
job-server[ERROR]  at spark.jobserver.JobManagerActor$$anonfun$
spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:
301)
job-server[ERROR]  at scala.concurrent.impl.Future$
PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
job-server[ERROR]  at scala.concurrent.impl.Future$
PromiseCompletingRunnable.run(Future.scala:24)
job-server[ERROR]  at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)
job-server[ERROR]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)
job-server[ERROR]  at java.lang.Thread.run(Thread.java:745)
job-server[ERROR] Caused by: java.lang.ClassNotFoundException:
org.apache.spark.sql.SQLContext
job-server[ERROR]  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
job-server[ERROR]  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
job-server[ERROR]  at java.security.AccessController.doPrivileged(Native
Method)
job-server[ERROR]  at java.net.URLClassLoader.findClass(URLClassLoader.java:
354)
job-server[ERROR]  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
job-server[ERROR]  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
job-server[ERROR]  ... 10 more


what is the problem?
do you have any solution about this?


SPARK -SQL Understanding BroadcastNestedLoopJoin and number of partitions

2016-12-21 Thread David Hodeffi

I have two dataframes which I am joining. small and big size dataframess. The 
optimizer suggest to use BroadcastNestedLoopJoin.
number of partitions for the big Dataframe is 200 while small Dataframe has 5 
partitions.
The joined dataframe results with 205 partitions (joined.rdd.partitions.size), 
I have tried to understand why is this number and figured out that 
BroadCastNestedLoopJoin is actually a union.
code :
case class BroadcastNestedLoopJoin{
def doExecuteo(): =
{ ... ... sparkContext.union( matchedStreamRows, 
sparkContext.makeRDD(notMatchedBroadcastRows) ) }
}
can someone please explain what exactly the code of doExecute() do? can you 
elaborate about all the null checks and why can we have nulls ? Why do we have 
205 partitions? link to a JIRA with discussion that can explain the code can 
help.


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Sebastian Piu
Is there any reason you need a context on the application launching the
jobs?
You can use SparkLauncher in a normal app and just listen for state
transitions

On Wed, 21 Dec 2016, 11:44 Naveen,  wrote:

> Hi Team,
>
> Thanks for your responses.
> Let me give more details in a picture of how I am trying to launch jobs.
>
> Main spark job will launch other spark-job similar to calling multiple
> spark-submit within a Spark driver program.
> These spawned threads for new jobs will be totally different components,
> so these cannot be implemented using spark actions.
>
> sample code:
>
> -
>
> Object Mainsparkjob {
>
> main(...){
>
> val sc=new SparkContext(..)
>
> Fetch from hive..using hivecontext
> Fetch from hbase
>
> //spawning multiple Futures..
> Val future1=Future{
> Val sparkjob= SparkLauncher(...).launch; spark.waitFor
> }
>
> Similarly, future2 to futureN.
>
> future1.onComplete{...}
> }
>
> }// end of mainsparkjob
> --
>
>
> [image: Inline image 1]
>
> On Wed, Dec 21, 2016 at 3:13 PM, David Hodeffi <
> david.hode...@niceactimize.com> wrote:
>
> I am not familiar of any problem with that.
>
> Anyway, If you run spark applicaction you would have multiple jobs, which
> makes sense that it is not a problem.
>
>
>
> Thanks David.
>
>
>
> *From:* Naveen [mailto:hadoopst...@gmail.com]
> *Sent:* Wednesday, December 21, 2016 9:18 AM
> *To:* d...@spark.apache.org; user@spark.apache.org
> *Subject:* Launching multiple spark jobs within a main spark job.
>
>
>
> Hi Team,
>
>
>
> Is it ok to spawn multiple spark jobs within a main spark job, my main
> spark job's driver which was launched on yarn cluster, will do some
> preprocessing and based on it, it needs to launch multilple spark jobs on
> yarn cluster. Not sure if this right pattern.
>
>
>
> Please share your thoughts.
>
> Sample code i ve is as below for better understanding..
>
> -
>
>
>
> Object Mainsparkjob {
>
>
>
> main(...){
>
>
>
> val sc=new SparkContext(..)
>
>
>
> Fetch from hive..using hivecontext
>
> Fetch from hbase
>
>
>
> //spawning multiple Futures..
>
> Val future1=Future{
>
> Val sparkjob= SparkLauncher(...).launch; spark.waitFor
>
> }
>
>
>
> Similarly, future2 to futureN.
>
>
>
> future1.onComplete{...}
>
> }
>
>
>
> }// end of mainsparkjob
>
> --
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
>


Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Naveen
Hi Team,

Thanks for your responses.
Let me give more details in a picture of how I am trying to launch jobs.

Main spark job will launch other spark-job similar to calling multiple
spark-submit within a Spark driver program.
These spawned threads for new jobs will be totally different components, so
these cannot be implemented using spark actions.

sample code:
-

Object Mainsparkjob {

main(...){

val sc=new SparkContext(..)

Fetch from hive..using hivecontext
Fetch from hbase

//spawning multiple Futures..
Val future1=Future{
Val sparkjob= SparkLauncher(...).launch; spark.waitFor
}

Similarly, future2 to futureN.

future1.onComplete{...}
}

}// end of mainsparkjob
--


[image: Inline image 1]

On Wed, Dec 21, 2016 at 3:13 PM, David Hodeffi <
david.hode...@niceactimize.com> wrote:

> I am not familiar of any problem with that.
>
> Anyway, If you run spark applicaction you would have multiple jobs, which
> makes sense that it is not a problem.
>
>
>
> Thanks David.
>
>
>
> *From:* Naveen [mailto:hadoopst...@gmail.com]
> *Sent:* Wednesday, December 21, 2016 9:18 AM
> *To:* d...@spark.apache.org; user@spark.apache.org
> *Subject:* Launching multiple spark jobs within a main spark job.
>
>
>
> Hi Team,
>
>
>
> Is it ok to spawn multiple spark jobs within a main spark job, my main
> spark job's driver which was launched on yarn cluster, will do some
> preprocessing and based on it, it needs to launch multilple spark jobs on
> yarn cluster. Not sure if this right pattern.
>
>
>
> Please share your thoughts.
>
> Sample code i ve is as below for better understanding..
>
> -
>
>
>
> Object Mainsparkjob {
>
>
>
> main(...){
>
>
>
> val sc=new SparkContext(..)
>
>
>
> Fetch from hive..using hivecontext
>
> Fetch from hbase
>
>
>
> //spawning multiple Futures..
>
> Val future1=Future{
>
> Val sparkjob= SparkLauncher(...).launch; spark.waitFor
>
> }
>
>
>
> Similarly, future2 to futureN.
>
>
>
> future1.onComplete{...}
>
> }
>
>
>
> }// end of mainsparkjob
>
> --
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Gradle dependency problem with spark

2016-12-21 Thread kant kodali
@Sean perhaps I could leverage this when this
http://openjdk.java.net/jeps/261 becomes available.

On Fri, Dec 16, 2016 at 4:05 AM, Steve Loughran 
wrote:

> FWIW, although the underlying Hadoop declared guava dependency is pretty
> low, everything in org.apache.hadoop is set up to run against later
> versions. It just sticks with the old one to avoid breaking anything
> donwstream which does expect a low version number. See HADOOP-10101 for the
> ongoing pain there —and complain on there if you do find something in the
> Hadoop layer which can't handle later guava versions.
>
>
>
>
> On 16 Dec 2016, at 11:07, Sean Owen  wrote:
>
> Yes, that's the problem. Guava isn't generally mutually compatible across
> more than a couple major releases. You may have to hunt for a version that
> happens to have the functionality that both dependencies want, and hope
> that exists. Spark should shade Guava at this point but doesn't mean that
> you won't hit this problem from transitive dependencies.
>
> On Fri, Dec 16, 2016 at 11:05 AM kant kodali  wrote:
>
>> I replaced *guava-14.0.1.jar*  with *guava-19.0.jar in *SPARK_HOME/jars
>> and seem to work ok but I am not sure if it is the right thing to do. My
>> fear is that if Spark uses features from Guava that are only present in
>> 14.0.1 but not in 19.0 I guess my app will break.
>>
>>
>>
>> On Fri, Dec 16, 2016 at 2:22 AM, kant kodali  wrote:
>>
>> Hi Guys,
>>
>> Here is the simplified version of my problem. I have the following
>> problem and I new to gradle
>>
>>
>> dependencies {
>> compile group: 'org.apache.spark', name: 'spark-core_2.11', version: 
>> '2.0.2'
>> compile group: 'com.github.brainlag', name: 'nsq-client', version: 
>> '1.0.0.RC2'
>> }
>>
>>
>> I took out the other dependencies for simplicity. The problem here
>> is spark-core_2.11 uses com.google.guava:14.0.1 and nsq-client uses
>> com.google.guava:19.0 so when I submit my fat uber jar using spark-submit I
>> get the following error
>>
>> Exception in thread "main" java.lang.NoSuchMethodError: 
>> com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
>> at com.github.brainlag.nsq.NSQProducer.(NSQProducer.java:22)
>> at com.hello.streamprocessing.app.SparkDriver2.main(SparkDriver2.java:37)
>>
>>
>> any help would be great. Also if you need more description you can find
>> it here
>> 
>>
>> Thanks!
>>
>>
>>
>


Approach: Incremental data load from HBASE

2016-12-21 Thread Chetan Khatri
Hello Guys,

I would like to understand different approach for Distributed Incremental
load from HBase, Is there any *tool / incubactor tool* which satisfy
requirement ?

*Approach 1:*

Write Kafka Producer and maintain manually column flag for events and
ingest it with Linkedin Gobblin to HDFS / S3.

*Approach 2:*

Run Scheduled Spark Job - Read from HBase and do transformations and
maintain flag column at HBase Level.

In above both approach, I need to maintain column level flags. such as 0 -
by default, 1-sent,2-sent and acknowledged. So next time Producer will take
another 1000 rows of batch where flag is 0 or 1.

I am looking for best practice approach with any distributed tool.

Thanks.

- Chetan Khatri


RE: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread David Hodeffi
I am not familiar of any problem with that.
Anyway, If you run spark applicaction you would have multiple jobs, which makes 
sense that it is not a problem.

Thanks David.

From: Naveen [mailto:hadoopst...@gmail.com]
Sent: Wednesday, December 21, 2016 9:18 AM
To: d...@spark.apache.org; user@spark.apache.org
Subject: Launching multiple spark jobs within a main spark job.

Hi Team,

Is it ok to spawn multiple spark jobs within a main spark job, my main spark 
job's driver which was launched on yarn cluster, will do some preprocessing and 
based on it, it needs to launch multilple spark jobs on yarn cluster. Not sure 
if this right pattern.

Please share your thoughts.
Sample code i ve is as below for better understanding..
-

Object Mainsparkjob {

main(...){

val sc=new SparkContext(..)

Fetch from hive..using hivecontext
Fetch from hbase

//spawning multiple Futures..
Val future1=Future{
Val sparkjob= SparkLauncher(...).launch; spark.waitFor
}

Similarly, future2 to futureN.

future1.onComplete{...}
}

}// end of mainsparkjob
--

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.