Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-13 Thread Bijay Pathak
Hi Swetha,

One option is to use Hive with the above issues fixed which is Hive 2.0 or
Cloudera CDH Hive 1.2 which has above issue resolved. One thing to remember
is it's not the Hive you have installed but the Hive Spark is using which
in Spark 1.6 is Hive version 1.2 as of now.

The workaround I did for this issue was to write dataframe directly using
dataframe write method and to create the Hive Table on top of that, doing
which my processing time was down  from 4+ hrs to just under 1 hr.


data_frame.write.partitionBy('idPartitioner','dtPartitoner').orc("path/to/final/location")

And ORC format is supported with HiveContext only.

Thanks,
Bijay


On Mon, Jun 13, 2016 at 11:41 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> Hi Mich,
>
> Following is  a sample code snippet:
>
>
> *val *userDF =
> userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId", 
> "userRecord").persist()
> System.*out*.println(" userRecsDF.partitions.size"+
> userRecsDF.partitions.size)
>
> userDF.registerTempTable("userRecordsTemp")
>
> sqlContext.sql("SET hive.default.fileformat=Orc  ")
> sqlContext.sql("set hive.enforce.bucketing = true; ")
> sqlContext.sql("set hive.enforce.sorting = true; ")
> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' ")
> sqlContext.sql(
>   """ from userRecordsTemp ps   insert overwrite table users
> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
> """.stripMargin)
>
>
> On Mon, Jun 13, 2016 at 10:57 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Bijay,
>>
>> If I am hitting this issue,
>> https://issues.apache.org/jira/browse/HIVE-11940. What needs to be done?
>> Incrementing to higher version of hive is the only solution?
>>
>> Thanks!
>>
>> On Mon, Jun 13, 2016 at 10:47 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Following is  a sample code snippet:
>>>
>>>
>>> *val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner",
>>> "userId", "userRecord").persist()
>>> System.*out*.println(" userRecsDF.partitions.size"+
>>> userRecsDF.partitions.size)
>>>
>>> userDF.registerTempTable("userRecordsTemp")
>>>
>>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>>> sqlContext.sql("set hive.enforce.sorting = true; ")
>>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
>>> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
>>> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' "
>>> )
>>> sqlContext.sql(
>>>   """ from userRecordsTemp ps   insert overwrite table users
>>> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
>>> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
>>> """.stripMargin)
>>>
>>>
>>>
>>>
>>> On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak <
>>> bijay.pat...@cloudwick.com> wrote:
>>>
 Hello,

 Looks like you are hitting this:
 https://issues.apache.org/jira/browse/HIVE-11940.

 Thanks,
 Bijay



 On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> cam you provide a code snippet of how you are populating the target
> table from temp table.
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 June 2016 at 23:43, swetha kasireddy 
> wrote:
>
>> No, I am reading the data from hdfs, transforming it , registering
>> the data in a temp table using registerTempTable and then doing insert
>> overwrite using Spark SQl' hiveContext.
>>
>> On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> how are you doing the insert? from an existing table?
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 9 June 2016 at 21:16, Stephen Boesch  wrote:
>>>
 How many workers (/cpu cores) are assigned to this job?

 2016-06-09 13:01 GMT-07:00 SRK :

> Hi,
>
> How to insert data into 2000 partitions(directories) 

Re: OutOfMemoryError - When saving Word2Vec

2016-06-13 Thread Yuhao Yang
Hi Sharad,

what's your vocabulary size and vector length for Word2Vec?

Regards,
Yuhao

2016-06-13 20:04 GMT+08:00 sharad82 :

> Is this the right forum to post Spark related issues ? I have tried this
> forum along with StackOverflow but not seeing any response.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-When-saving-Word2Vec-tp27142p27151.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is there a limit on the number of tasks in one job?

2016-06-13 Thread Takeshi Yamamuro
Hi,

You can control an initial num. of partitions (tasks) in v2.0.
https://www.mail-archive.com/user@spark.apache.org/msg51603.html

// maropu


On Tue, Jun 14, 2016 at 7:24 AM, Mich Talebzadeh 
wrote:

> Have you looked at spark GUI to see what it is waiting for. is that
> available memory. What is the resource manager you are using?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 13 June 2016 at 20:45, Khaled Hammouda  wrote:
>
>> Hi Michael,
>>
>> Thanks for the suggestion to use Spark 2.0 preview. I just downloaded the
>> preview and tried using it, but I’m running into the exact same issue.
>>
>> Khaled
>>
>> On Jun 13, 2016, at 2:58 PM, Michael Armbrust 
>> wrote:
>>
>> You might try with the Spark 2.0 preview.  We spent a bunch of time
>> improving the handling of many small files.
>>
>> On Mon, Jun 13, 2016 at 11:19 AM, khaled.hammouda <
>> khaled.hammo...@kik.com> wrote:
>>
>>> I'm trying to use Spark SQL to load json data that are split across
>>> about 70k
>>> files across 24 directories in hdfs, using
>>> sqlContext.read.json("hdfs:///user/hadoop/data/*/*").
>>>
>>> This doesn't seem to work for some reason, I get timeout errors like the
>>> following:
>>>
>>> ---
>>> 6/06/13 15:46:31 ERROR TransportChannelHandler: Connection to
>>> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 has been quiet for
>>> 12
>>> ms while there are outstanding requests. Assuming connection is dead;
>>> please
>>> adjust spark.network.timeout if this is wrong.
>>> 16/06/13 15:46:31 ERROR TransportResponseHandler: Still have 1 requests
>>> outstanding when connection from
>>> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 is closed
>>> ...
>>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
>>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>>> ...
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>>> [120 seconds]
>>> --
>>>
>>> I don't want to start tinkering with increasing timeouts yet. I tried to
>>> load just one sub-directory, which contains around 4k files, and this
>>> seems
>>> to work fine. So I thought of writing a loop where I load the json files
>>> from each sub-dir and then unionAll the current dataframe with the
>>> previous
>>> dataframe. However, this also fails because apparently the json files
>>> don't
>>> have the exact same schema, causing this error:
>>>
>>> ---
>>> Traceback (most recent call last):
>>>   File "/home/hadoop/load_json.py", line 65, in 
>>> df = df.unionAll(hrdf)
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
>>> line 998, in unionAll
>>>   File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
>>> line 813, in __call__
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
>>> 51, in deco
>>> pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;"
>>> ---
>>>
>>> I'd like to know what's preventing Spark from loading 70k files the same
>>> way
>>> it's loading 4k files?
>>>
>>> To give you some idea about my setup and data:
>>> - ~70k files across 24 directories in HDFS
>>> - Each directory contains 3k files on average
>>> - Cluster: 200 nodes EMR cluster, each node has 53 GB memory and 8 cores
>>> available to YARN
>>> - Spark 1.6.1
>>>
>>> Thanks.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-limit-on-the-number-of-tasks-in-one-job-tp27158.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> .
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Suggestions on Lambda Architecture in Spark

2016-06-13 Thread Luciano Resende
This might be useful:

https://spark-summit.org/east-2016/events/lambda-at-weather-scale/

On Monday, June 13, 2016, KhajaAsmath Mohammed 
wrote:

> Hi,
>
> In my current project, we are planning to implement POC for lambda
> architecture using spark streaming. My use case would be
>
> Kafka --> bacth layer --> Saprk SQL --> Cassandra
>
> Kafka --> Speed layer --> Spark Streaming --> Cassandra
>
> Serving later --> Contact both the layers but I am not sure how the data
> is queried in this case.
>
> Does anyone have any github links or tutorials on how to implement lambda
> architecture especially the serving layer?
>
> Thanks,
> Asmath.
>


-- 
Sent from my Mobile device


Suggestions on Lambda Architecture in Spark

2016-06-13 Thread KhajaAsmath Mohammed
Hi,

In my current project, we are planning to implement POC for lambda
architecture using spark streaming. My use case would be

Kafka --> bacth layer --> Saprk SQL --> Cassandra

Kafka --> Speed layer --> Spark Streaming --> Cassandra

Serving later --> Contact both the layers but I am not sure how the data is
queried in this case.

Does anyone have any github links or tutorials on how to implement lambda
architecture especially the serving layer?

Thanks,
Asmath.


Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-13 Thread Arun Patel
Thanks Michael.

I went thru these slides already and could not find answers for these
specific questions.

I created a Dataset and converted it to DataFrame in 1.6 and 2.0.  I don't
see any difference in 1.6 vs 2.0.  So, I really got confused and asked
these questions about unification.

Appreciate if you can answer these specific questions.  Thank you very much!

On Mon, Jun 13, 2016 at 2:55 PM, Michael Armbrust 
wrote:

> Here's a talk I gave on the topic:
>
> https://www.youtube.com/watch?v=i7l3JQRx7Qw
>
> http://www.slideshare.net/SparkSummit/structuring-spark-dataframes-datasets-and-streaming-by-michael-armbrust
>
> On Mon, Jun 13, 2016 at 4:01 AM, Arun Patel 
> wrote:
>
>> In Spark 2.0, DataFrames and Datasets are unified. DataFrame is simply an
>> alias for a Dataset of type row.   I have few questions.
>>
>> 1) What does this really mean to an Application developer?
>> 2) Why this unification was needed in Spark 2.0?
>> 3) What changes can be observed in Spark 2.0 vs Spark 1.6?
>> 4) Compile time safety will be there for DataFrames too?
>> 5) Python API is supported for Datasets in 2.0?
>>
>> Thanks
>> Arun
>>
>
>


Re: Is there a limit on the number of tasks in one job?

2016-06-13 Thread Mich Talebzadeh
Have you looked at spark GUI to see what it is waiting for. is that
available memory. What is the resource manager you are using?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 13 June 2016 at 20:45, Khaled Hammouda  wrote:

> Hi Michael,
>
> Thanks for the suggestion to use Spark 2.0 preview. I just downloaded the
> preview and tried using it, but I’m running into the exact same issue.
>
> Khaled
>
> On Jun 13, 2016, at 2:58 PM, Michael Armbrust 
> wrote:
>
> You might try with the Spark 2.0 preview.  We spent a bunch of time
> improving the handling of many small files.
>
> On Mon, Jun 13, 2016 at 11:19 AM, khaled.hammouda  > wrote:
>
>> I'm trying to use Spark SQL to load json data that are split across about
>> 70k
>> files across 24 directories in hdfs, using
>> sqlContext.read.json("hdfs:///user/hadoop/data/*/*").
>>
>> This doesn't seem to work for some reason, I get timeout errors like the
>> following:
>>
>> ---
>> 6/06/13 15:46:31 ERROR TransportChannelHandler: Connection to
>> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 has been quiet for
>> 12
>> ms while there are outstanding requests. Assuming connection is dead;
>> please
>> adjust spark.network.timeout if this is wrong.
>> 16/06/13 15:46:31 ERROR TransportResponseHandler: Still have 1 requests
>> outstanding when connection from
>> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 is closed
>> ...
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>> ...
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [120 seconds]
>> --
>>
>> I don't want to start tinkering with increasing timeouts yet. I tried to
>> load just one sub-directory, which contains around 4k files, and this
>> seems
>> to work fine. So I thought of writing a loop where I load the json files
>> from each sub-dir and then unionAll the current dataframe with the
>> previous
>> dataframe. However, this also fails because apparently the json files
>> don't
>> have the exact same schema, causing this error:
>>
>> ---
>> Traceback (most recent call last):
>>   File "/home/hadoop/load_json.py", line 65, in 
>> df = df.unionAll(hrdf)
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
>> line 998, in unionAll
>>   File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
>> line 813, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
>> 51, in deco
>> pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;"
>> ---
>>
>> I'd like to know what's preventing Spark from loading 70k files the same
>> way
>> it's loading 4k files?
>>
>> To give you some idea about my setup and data:
>> - ~70k files across 24 directories in HDFS
>> - Each directory contains 3k files on average
>> - Cluster: 200 nodes EMR cluster, each node has 53 GB memory and 8 cores
>> available to YARN
>> - Spark 1.6.1
>>
>> Thanks.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-limit-on-the-number-of-tasks-in-one-job-tp27158.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Spark Memory Error - Not enough space to cache broadcast

2016-06-13 Thread Cassa L
Hi,

I'm using spark 1.5.1 version. I am reading data from Kafka into Spark
and writing it into Cassandra after processing it. Spark job starts
fine and runs all good for some time until I start getting below
errors. Once these errors come, job start to lag behind and I see that
job has scheduling and processing delays in streaming  UI.

Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak
memoryFraction parameters. Nothing works.


16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with
curMem=565394, maxMem=2778495713
16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0
stored as bytes in memory (estimated size 3.9 KB, free 2.6 GB)
16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable
69652 took 2 ms
16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block broadcast_69652 in memory.
16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache
broadcast_69652 in memory! (computed 496.0 B so far)
16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) +
2.6 GB (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage
limit = 2.6 GB.
16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652
to disk instead.
16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0
(TID 452316). 2043 bytes result sent to driver


Thanks,

L


how to investigate skew and DataFrames and RangePartitioner

2016-06-13 Thread Peter Halliday
I have two questions

First,I have a failure when I write parquet from Spark 1.6.1 on Amazon EMR to 
S3.  This is full batch, which is over 200GB of source data.  The partitioning 
is based on a geographic identifier we use, and also a date we got the data.  
However, because of geographical density we certainly could be hitting the fact 
we are getting tiles too dense.  I’m trying to figure out how to figure out the 
size of the file it’s trying to write out.

Second, We use to use RDDs and RangePartitioner for task partitioning.  
However, I don’t see this available in DataFrames.  How does one achieve this 
now.

Peter Halliday
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LegacyAccumulatorWrapper basically requires the Accumulator value to implement equlas() or it will fail on isZero()

2016-06-13 Thread Amit Sela
I thought so, and I agree. Still good to have this indexed here :)

On Mon, Jun 13, 2016 at 10:43 PM Sean Owen  wrote:

> I think that's right, but that seems as expected. If you're going to
> use this utility wrapper class, it can only determine if something is
> zero by comparing it to your 'zero' object, and that means defining
> equality. I suspect it's uncommon to accumulate things that aren't
> primitives or standard collections, which already define equality. But
> I'd expect I'd have to define equality for some custom user class in
> general if handing it over for a library to compare, add, clear, etc.
>
> On Mon, Jun 13, 2016 at 8:15 PM, Amit Sela  wrote:
> > It seems that if you have an AccumulatorParam (or AccumulableParam) where
> > "R" is not a primitive, it will need to implement equlas() if the
> > implementation of the zero() creates a new instance (which I guess it
> will
> > in those cases).
> > This is where isZero applies the comparison:
> >
> https://github.com/apache/spark/blob/254bc8c34e70241508bdfc8ff42a65491f5280cd/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L462
> > I overcame this by providing null in zero and instantiating in
> > addAccumulator() but that's more of a hack, on the other hand, I don't
> see a
> > trivial solution, which is one of the reasons I'm writing this.
> > Anyone encounter this when upgrading to 2.0 ?  with non-trivial
> Accumulators
> > of course..
> >
> >
> > Thanks,
> > Amit
>


Re: Is there a limit on the number of tasks in one job?

2016-06-13 Thread Khaled Hammouda
Hi Michael,

Thanks for the suggestion to use Spark 2.0 preview. I just downloaded the 
preview and tried using it, but I’m running into the exact same issue.

Khaled

> On Jun 13, 2016, at 2:58 PM, Michael Armbrust  wrote:
> 
> You might try with the Spark 2.0 preview.  We spent a bunch of time improving 
> the handling of many small files.
> 
> On Mon, Jun 13, 2016 at 11:19 AM, khaled.hammouda  > wrote:
> I'm trying to use Spark SQL to load json data that are split across about 70k
> files across 24 directories in hdfs, using
> sqlContext.read.json("hdfs:///user/hadoop/data/*/*").
> 
> This doesn't seem to work for some reason, I get timeout errors like the
> following:
> 
> ---
> 6/06/13 15:46:31 ERROR TransportChannelHandler: Connection to
> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 
>  has been quiet for 12
> ms while there are outstanding requests. Assuming connection is dead; please
> adjust spark.network.timeout if this is wrong.
> 16/06/13 15:46:31 ERROR TransportResponseHandler: Still have 1 requests
> outstanding when connection from
> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 
>  is closed
> ...
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> ...
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [120 seconds]
> --
> 
> I don't want to start tinkering with increasing timeouts yet. I tried to
> load just one sub-directory, which contains around 4k files, and this seems
> to work fine. So I thought of writing a loop where I load the json files
> from each sub-dir and then unionAll the current dataframe with the previous
> dataframe. However, this also fails because apparently the json files don't
> have the exact same schema, causing this error:
> 
> ---
> Traceback (most recent call last):
>   File "/home/hadoop/load_json.py", line 65, in 
> df = df.unionAll(hrdf)
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
> line 998, in unionAll
>   File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
> line 813, in __call__
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
> 51, in deco
> pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;"
> ---
> 
> I'd like to know what's preventing Spark from loading 70k files the same way
> it's loading 4k files?
> 
> To give you some idea about my setup and data:
> - ~70k files across 24 directories in HDFS
> - Each directory contains 3k files on average
> - Cluster: 200 nodes EMR cluster, each node has 53 GB memory and 8 cores
> available to YARN
> - Spark 1.6.1
> 
> Thanks.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-limit-on-the-number-of-tasks-in-one-job-tp27158.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: LegacyAccumulatorWrapper basically requires the Accumulator value to implement equlas() or it will fail on isZero()

2016-06-13 Thread Sean Owen
I think that's right, but that seems as expected. If you're going to
use this utility wrapper class, it can only determine if something is
zero by comparing it to your 'zero' object, and that means defining
equality. I suspect it's uncommon to accumulate things that aren't
primitives or standard collections, which already define equality. But
I'd expect I'd have to define equality for some custom user class in
general if handing it over for a library to compare, add, clear, etc.

On Mon, Jun 13, 2016 at 8:15 PM, Amit Sela  wrote:
> It seems that if you have an AccumulatorParam (or AccumulableParam) where
> "R" is not a primitive, it will need to implement equlas() if the
> implementation of the zero() creates a new instance (which I guess it will
> in those cases).
> This is where isZero applies the comparison:
> https://github.com/apache/spark/blob/254bc8c34e70241508bdfc8ff42a65491f5280cd/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L462
> I overcame this by providing null in zero and instantiating in
> addAccumulator() but that's more of a hack, on the other hand, I don't see a
> trivial solution, which is one of the reasons I'm writing this.
> Anyone encounter this when upgrading to 2.0 ?  with non-trivial Accumulators
> of course..
>
>
> Thanks,
> Amit

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



LegacyAccumulatorWrapper basically requires the Accumulator value to implement equlas() or it will fail on isZero()

2016-06-13 Thread Amit Sela
It seems that if you have an AccumulatorParam (or AccumulableParam) where
"R" is not a primitive, it will need to implement equlas() if the
implementation of the zero() creates a new instance (which I guess it will
in those cases).
This is where isZero applies the comparison:
https://github.com/apache/spark/blob/254bc8c34e70241508bdfc8ff42a65491f5280cd/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L462
I overcame this by providing null in zero and instantiating in
addAccumulator() but that's more of a hack, on the other hand, I don't see
a trivial solution, which is one of the reasons I'm writing this.
Anyone encounter this when upgrading to 2.0 ?  with non-trivial
Accumulators of course..


Thanks,
Amit


Re: Is there a limit on the number of tasks in one job?

2016-06-13 Thread Michael Armbrust
You might try with the Spark 2.0 preview.  We spent a bunch of time
improving the handling of many small files.

On Mon, Jun 13, 2016 at 11:19 AM, khaled.hammouda 
wrote:

> I'm trying to use Spark SQL to load json data that are split across about
> 70k
> files across 24 directories in hdfs, using
> sqlContext.read.json("hdfs:///user/hadoop/data/*/*").
>
> This doesn't seem to work for some reason, I get timeout errors like the
> following:
>
> ---
> 6/06/13 15:46:31 ERROR TransportChannelHandler: Connection to
> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 has been quiet for
> 12
> ms while there are outstanding requests. Assuming connection is dead;
> please
> adjust spark.network.timeout if this is wrong.
> 16/06/13 15:46:31 ERROR TransportResponseHandler: Still have 1 requests
> outstanding when connection from
> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 is closed
> ...
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> ...
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [120 seconds]
> --
>
> I don't want to start tinkering with increasing timeouts yet. I tried to
> load just one sub-directory, which contains around 4k files, and this seems
> to work fine. So I thought of writing a loop where I load the json files
> from each sub-dir and then unionAll the current dataframe with the previous
> dataframe. However, this also fails because apparently the json files don't
> have the exact same schema, causing this error:
>
> ---
> Traceback (most recent call last):
>   File "/home/hadoop/load_json.py", line 65, in 
> df = df.unionAll(hrdf)
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
> line 998, in unionAll
>   File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
> line 813, in __call__
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
> 51, in deco
> pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;"
> ---
>
> I'd like to know what's preventing Spark from loading 70k files the same
> way
> it's loading 4k files?
>
> To give you some idea about my setup and data:
> - ~70k files across 24 directories in HDFS
> - Each directory contains 3k files on average
> - Cluster: 200 nodes EMR cluster, each node has 53 GB memory and 8 cores
> available to YARN
> - Spark 1.6.1
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-limit-on-the-number-of-tasks-in-one-job-tp27158.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Thrift Server in CDH 5.3

2016-06-13 Thread Michael Armbrust
I'd try asking on the cloudera forums.

On Sun, Jun 12, 2016 at 9:51 PM, pooja mehta  wrote:

> Hi,
>
> How do I start Spark Thrift Server with cloudera CDH 5.3?
>
> Thanks.
>


Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-13 Thread Michael Armbrust
Here's a talk I gave on the topic:

https://www.youtube.com/watch?v=i7l3JQRx7Qw
http://www.slideshare.net/SparkSummit/structuring-spark-dataframes-datasets-and-streaming-by-michael-armbrust

On Mon, Jun 13, 2016 at 4:01 AM, Arun Patel  wrote:

> In Spark 2.0, DataFrames and Datasets are unified. DataFrame is simply an
> alias for a Dataset of type row.   I have few questions.
>
> 1) What does this really mean to an Application developer?
> 2) Why this unification was needed in Spark 2.0?
> 3) What changes can be observed in Spark 2.0 vs Spark 1.6?
> 4) Compile time safety will be there for DataFrames too?
> 5) Python API is supported for Datasets in 2.0?
>
> Thanks
> Arun
>


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-13 Thread swetha kasireddy
Hi Mich,

Following is  a sample code snippet:


*val *userDF =
userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId",
"userRecord").persist()
System.*out*.println(" userRecsDF.partitions.size"+
userRecsDF.partitions.size)

userDF.registerTempTable("userRecordsTemp")

sqlContext.sql("SET hive.default.fileformat=Orc  ")
sqlContext.sql("set hive.enforce.bucketing = true; ")
sqlContext.sql("set hive.enforce.sorting = true; ")
sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId STRING,
userRecord STRING) PARTITIONED BY (idPartitioner STRING, dtPartitioner
STRING)   stored as ORC LOCATION '/user/userId/userRecords' ")
sqlContext.sql(
  """ from userRecordsTemp ps   insert overwrite table users
partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
""".stripMargin)


On Mon, Jun 13, 2016 at 10:57 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> Hi Bijay,
>
> If I am hitting this issue,
> https://issues.apache.org/jira/browse/HIVE-11940. What needs to be done?
> Incrementing to higher version of hive is the only solution?
>
> Thanks!
>
> On Mon, Jun 13, 2016 at 10:47 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> Following is  a sample code snippet:
>>
>>
>> *val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId",
>> "userRecord").persist()
>> System.*out*.println(" userRecsDF.partitions.size"+
>> userRecsDF.partitions.size)
>>
>> userDF.registerTempTable("userRecordsTemp")
>>
>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>> sqlContext.sql("set hive.enforce.sorting = true; ")
>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
>> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
>> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' "
>> )
>> sqlContext.sql(
>>   """ from userRecordsTemp ps   insert overwrite table users
>> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
>> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
>> """.stripMargin)
>>
>>
>>
>>
>> On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak <
>> bijay.pat...@cloudwick.com> wrote:
>>
>>> Hello,
>>>
>>> Looks like you are hitting this:
>>> https://issues.apache.org/jira/browse/HIVE-11940.
>>>
>>> Thanks,
>>> Bijay
>>>
>>>
>>>
>>> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 cam you provide a code snippet of how you are populating the target
 table from temp table.


 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 9 June 2016 at 23:43, swetha kasireddy 
 wrote:

> No, I am reading the data from hdfs, transforming it , registering the
> data in a temp table using registerTempTable and then doing insert
> overwrite using Spark SQl' hiveContext.
>
> On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> how are you doing the insert? from an existing table?
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 9 June 2016 at 21:16, Stephen Boesch  wrote:
>>
>>> How many workers (/cpu cores) are assigned to this job?
>>>
>>> 2016-06-09 13:01 GMT-07:00 SRK :
>>>
 Hi,

 How to insert data into 2000 partitions(directories) of
 ORC/parquet  at a
 time using Spark SQL? It seems to be not performant when I try to
 insert
 2000 directories of Parquet/ORC using Spark SQL. Did anyone face
 this issue?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>

>>>
>>
>


Is there a limit on the number of tasks in one job?

2016-06-13 Thread khaled.hammouda
I'm trying to use Spark SQL to load json data that are split across about 70k
files across 24 directories in hdfs, using
sqlContext.read.json("hdfs:///user/hadoop/data/*/*").

This doesn't seem to work for some reason, I get timeout errors like the
following:

---
6/06/13 15:46:31 ERROR TransportChannelHandler: Connection to
ip-172-31-31-114.ec2.internal/172.31.31.114:46028 has been quiet for 12
ms while there are outstanding requests. Assuming connection is dead; please
adjust spark.network.timeout if this is wrong.
16/06/13 15:46:31 ERROR TransportResponseHandler: Still have 1 requests
outstanding when connection from
ip-172-31-31-114.ec2.internal/172.31.31.114:46028 is closed
...
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[120 seconds]
--

I don't want to start tinkering with increasing timeouts yet. I tried to
load just one sub-directory, which contains around 4k files, and this seems
to work fine. So I thought of writing a loop where I load the json files
from each sub-dir and then unionAll the current dataframe with the previous
dataframe. However, this also fails because apparently the json files don't
have the exact same schema, causing this error:

---
Traceback (most recent call last):
  File "/home/hadoop/load_json.py", line 65, in 
df = df.unionAll(hrdf)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
line 998, in unionAll
  File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
line 813, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
51, in deco
pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;"
---

I'd like to know what's preventing Spark from loading 70k files the same way
it's loading 4k files?

To give you some idea about my setup and data:
- ~70k files across 24 directories in HDFS
- Each directory contains 3k files on average
- Cluster: 200 nodes EMR cluster, each node has 53 GB memory and 8 cores
available to YARN
- Spark 1.6.1

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-limit-on-the-number-of-tasks-in-one-job-tp27158.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-13 Thread swetha kasireddy
Hi Bijay,

If I am hitting this issue,
https://issues.apache.org/jira/browse/HIVE-11940. What needs to be done?
Incrementing to higher version of hive is the only solution?

Thanks!

On Mon, Jun 13, 2016 at 10:47 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> Hi,
>
> Following is  a sample code snippet:
>
>
> *val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId",
> "userRecord").persist()
> System.*out*.println(" userRecsDF.partitions.size"+
> userRecsDF.partitions.size)
>
> userDF.registerTempTable("userRecordsTemp")
>
> sqlContext.sql("SET hive.default.fileformat=Orc  ")
> sqlContext.sql("set hive.enforce.bucketing = true; ")
> sqlContext.sql("set hive.enforce.sorting = true; ")
> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' "
> )
> sqlContext.sql(
>   """ from userRecordsTemp ps   insert overwrite table users
> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
> """.stripMargin)
>
>
>
>
> On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak  > wrote:
>
>> Hello,
>>
>> Looks like you are hitting this:
>> https://issues.apache.org/jira/browse/HIVE-11940.
>>
>> Thanks,
>> Bijay
>>
>>
>>
>> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> cam you provide a code snippet of how you are populating the target
>>> table from temp table.
>>>
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 9 June 2016 at 23:43, swetha kasireddy 
>>> wrote:
>>>
 No, I am reading the data from hdfs, transforming it , registering the
 data in a temp table using registerTempTable and then doing insert
 overwrite using Spark SQl' hiveContext.

 On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> how are you doing the insert? from an existing table?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 June 2016 at 21:16, Stephen Boesch  wrote:
>
>> How many workers (/cpu cores) are assigned to this job?
>>
>> 2016-06-09 13:01 GMT-07:00 SRK :
>>
>>> Hi,
>>>
>>> How to insert data into 2000 partitions(directories) of ORC/parquet
>>> at a
>>> time using Spark SQL? It seems to be not performant when I try to
>>> insert
>>> 2000 directories of Parquet/ORC using Spark SQL. Did anyone face
>>> this issue?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
>>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

>>>
>>
>


Re: SAS_TO_SPARK_SQL_(Could be a Bug?)

2016-06-13 Thread Deepak Sharma
Hi Ajay
Looking at spark code , i can see you used hive context.
Can you try using  sql context instead of hive context there?

Thanks
Deepak

On Mon, Jun 13, 2016 at 10:15 PM, Ajay Chander  wrote:

> Hi Mohit,
>
> Thanks for your time. Please find my response below.
>
> Did you try the same with another database?
> I do load the data from MySQL and SQL Server the same way(through SPARK
> SQL JDBC) which works perfectly alright.
>
> As a workaround you can write the select statement yourself instead of
> just providing the table name?
> Yes I did that too. It did not made any difference.
>
> Thank you,
> Ajay
>
> On Sunday, June 12, 2016, Mohit Jaggi  wrote:
>
>> Looks like a bug in the code generating the SQL query…why would it be
>> specific to SAS, I can’t guess. Did you try the same with another database?
>> As a workaround you can write the select statement yourself instead of just
>> providing the table name.
>>
>> On Jun 11, 2016, at 6:27 PM, Ajay Chander  wrote:
>>
>> I tried implementing the same functionality through Scala as well. But no
>> luck so far. Just wondering if anyone here tried using Spark SQL to read
>> SAS dataset? Thank you
>>
>> Regards,
>> Ajay
>>
>> On Friday, June 10, 2016, Ajay Chander  wrote:
>>
>>> Mich, I completely agree with you. I built another Spark SQL application
>>> which reads data from MySQL and SQL server and writes the data into
>>> Hive(parquet+snappy format). I have this problem only when I read directly
>>> from remote SAS system. The interesting part is I am using same driver to
>>> read data through pure Java app and spark app. It works fine in Java
>>> app, so I cannot blame SAS driver here. Trying to understand where the
>>> problem could be. Thanks for sharing this with me.
>>>
>>> On Friday, June 10, 2016, Mich Talebzadeh 
>>> wrote:
>>>
 I personally use Scala to do something similar. For example here I
 extract data from an Oracle table and store in ORC table in Hive. This is
 compiled via sbt as run with SparkSubmit.

 It is similar to your code but in Scala. Note that I do not enclose my
 column names in double quotes.

 import org.apache.spark.SparkContext
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.functions._

 object ETL_scratchpad_dummy {
   def main(args: Array[String]) {
   val conf = new SparkConf().
setAppName("ETL_scratchpad_dummy").
set("spark.driver.allowMultipleContexts", "true")
   val sc = new SparkContext(conf)
   // Create sqlContext based on HiveContext
   val sqlContext = new HiveContext(sc)
   import sqlContext.implicits._
   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
   println ("\nStarted at"); sqlContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
   HiveContext.sql("use oraclehadoop")
   var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb12"
   var _username : String = "scratchpad"
   var _password : String = ""

   // Get data from Oracle table scratchpad.dummy
   val d = HiveContext.load("jdbc",
   Map("url" -> _ORACLEserver,
   "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
 CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
 RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
   "user" -> _username,
   "password" -> _password))

d.registerTempTable("tmp")
   //
   // Need to create and populate target ORC table oraclehadoop.dummy
   //
   HiveContext.sql("use oraclehadoop")
   //
   // Drop and create table dummy
   //
   HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.dummy")
   var sqltext : String = ""
   sqltext = """
   CREATE TABLE oraclehadoop.dummy (
  ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(10)
, PADDING  VARCHAR(10)
   )
   CLUSTERED BY (ID) INTO 256 BUCKETS
   STORED AS ORC
   TBLPROPERTIES (
   "orc.create.index"="true",
   "orc.bloom.filter.columns"="ID",
   "orc.bloom.filter.fpp"="0.05",
   "orc.compress"="SNAPPY",
   "orc.stripe.size"="16777216",
   "orc.row.index.stride"="1" )
   """
HiveContext.sql(sqltext)
   //
   // Put data in Hive table. Clean up is already done
   //
   sqltext = """
   INSERT INTO TABLE oraclehadoop.dummy
   SELECT
   ID
 , CLUSTERED
 , SCATTERED
 , RANDOMISED

Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-13 Thread swetha kasireddy
Hi,

Following is  a sample code snippet:


*val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId",
"userRecord").persist()
System.*out*.println(" userRecsDF.partitions.size"+
userRecsDF.partitions.size)

userDF.registerTempTable("userRecordsTemp")

sqlContext.sql("SET hive.default.fileformat=Orc  ")
sqlContext.sql("set hive.enforce.bucketing = true; ")
sqlContext.sql("set hive.enforce.sorting = true; ")
sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId STRING,
userRecord STRING) PARTITIONED BY (idPartitioner STRING, dtPartitioner
STRING)   stored as ORC LOCATION '/user/userId/userRecords' ")
sqlContext.sql(
  """ from userRecordsTemp ps   insert overwrite table users
partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
""".stripMargin)




On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak 
wrote:

> Hello,
>
> Looks like you are hitting this:
> https://issues.apache.org/jira/browse/HIVE-11940.
>
> Thanks,
> Bijay
>
>
>
> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh  > wrote:
>
>> cam you provide a code snippet of how you are populating the target table
>> from temp table.
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 9 June 2016 at 23:43, swetha kasireddy 
>> wrote:
>>
>>> No, I am reading the data from hdfs, transforming it , registering the
>>> data in a temp table using registerTempTable and then doing insert
>>> overwrite using Spark SQl' hiveContext.
>>>
>>> On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 how are you doing the insert? from an existing table?

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 9 June 2016 at 21:16, Stephen Boesch  wrote:

> How many workers (/cpu cores) are assigned to this job?
>
> 2016-06-09 13:01 GMT-07:00 SRK :
>
>> Hi,
>>
>> How to insert data into 2000 partitions(directories) of ORC/parquet
>> at a
>> time using Spark SQL? It seems to be not performant when I try to
>> insert
>> 2000 directories of Parquet/ORC using Spark SQL. Did anyone face this
>> issue?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

>>>
>>
>


Computing on each partition/executor with "persistent" data

2016-06-13 Thread Jeroen Miller
Dear fellow Sparkers,

I am barely dipping my toes into the Spark world and I was wondering if the
​ ​
following workflow can be implemented in Spark:

1. Initialize custom data structure DS on each executor .
   These data structures DS should live until the end of the
   program.

2. While (some_boolean):

2.1. Read data to RDD
2.2. Partition the RDD with custom partioner
2.3. Loop over the partitions (alternatively executors):
2.3.1 Process RDD partition using data structure DS.
  Said data structures will need to be updated.

3. Collect the results. This will need access to each DS.

This question is probably very naive, and yes, I am aware this looks more
like what one would do with MPI than in Spark.

Best regards,

Jeroen


Re: Spark Streaming application failing with Kerboros issue while writing data to HBase

2016-06-13 Thread Ted Yu
Can you show snippet of your code, please ?

Please refer to obtainTokenForHBase() in
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Cheers

On Mon, Jun 13, 2016 at 4:44 AM, Kamesh  wrote:

> Hi All,
>  We are building a spark streaming application and that application writes
> data to HBase table. But writes/reads are failing with following exception
>
> 16/06/13 04:35:16 ERROR ipc.AbstractRpcClient: SASL authentication failed.
> The most likely cause is missing or invalid credentials. Consider 'kinit'.
>
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
>
> at
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>
> at
> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
>
> at
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
>
> This application is failing at Executor machine. Executor is not able to
> pass the token. Can someone help me how to resolve this issue.
>
> *Environment Details*
> Spark Version : 1.6.1
> HBase Version : 1.0.0
> Hadoop Version : 2.6.0
>
> --
> Thanks & Regards
> Kamesh.
>


Re: What is the interpretation of Cores in Spark doc

2016-06-13 Thread Mark Hamstra
I don't know what documentation you were referring to, but this is clearly
an erroneous statement: "Threads are virtual cores."  At best it is
terminology abuse by a hardware manufacturer.  Regardless, Spark can't get
too concerned about how any particular hardware vendor wants to refer to
the specific components of their CPU architecture.  For us, a core is a
logical execution unit, something on which a thread of execution can run.
That can map in different ways to different physical or virtual hardware.

On Mon, Jun 13, 2016 at 12:02 AM, Mich Talebzadeh  wrote:

> Hi,
>
> It is not the issue of testing anything. I was referring to documentation
> that clearly use the term "threads". As I said and showed before, one line
> is using the term "thread" and the next one "logical cores".
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 12 June 2016 at 23:57, Daniel Darabos  > wrote:
>
>> Spark is a software product. In software a "core" is something that a
>> process can run on. So it's a "virtual core". (Do not call these "threads".
>> A "thread" is not something a process can run on.)
>>
>> local[*] uses java.lang.Runtime.availableProcessors()
>> .
>> Since Java is software, this also returns the number of virtual cores. (You
>> can test this easily.)
>>
>>
>> On Sun, Jun 12, 2016 at 9:23 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I was writing some docs on Spark P and came across this.
>>>
>>> It is about the terminology or interpretation of that in Spark doc.
>>>
>>> This is my understanding of cores and threads.
>>>
>>>  Cores are physical cores. Threads are virtual cores. Cores with 2
>>> threads is called hyper threading technology so 2 threads per core makes
>>> the core work on two loads at same time. In other words, every thread takes
>>> care of one load.
>>>
>>> Core has its own memory. So if you have a dual core with hyper
>>> threading, the core works with 2 loads each at same time because of the 2
>>> threads per core, but this 2 threads will share memory in that core.
>>>
>>> Some vendors as I am sure most of you aware charge licensing per core.
>>>
>>> For example on the same host that I have Spark, I have a SAP product
>>> that checks the licensing and shuts the application down if the license
>>> does not agree with the cores speced.
>>>
>>> This is what it says
>>>
>>> ./cpuinfo
>>> License hostid:00e04c69159a 0050b60fd1e7
>>> Detected 12 logical processor(s), 6 core(s), in 1 chip(s)
>>>
>>> So here I have 12 logical processors  and 6 cores and 1 chip. I call
>>> logical processors as threads so I have 12 threads?
>>>
>>> Now if I go and start worker process ${SPARK_HOME}/sbin/start-slaves.sh,
>>> I see this in GUI page
>>>
>>> [image: Inline images 1]
>>>
>>> it says 12 cores but I gather it is threads?
>>>
>>>
>>> Spark document
>>> 
>>> states and I quote
>>>
>>>
>>> [image: Inline images 2]
>>>
>>>
>>>
>>> OK the line local[k] adds  ..  *set this to the number of cores on your
>>> machine*
>>>
>>>
>>> But I know that it means threads. Because if I went and set that to 6,
>>> it would be only 6 threads as opposed to 12 threads.
>>>
>>>
>>> the next line local[*] seems to indicate it correctly as it refers to
>>> "logical cores" that in my understanding it is threads.
>>>
>>>
>>> I trust that I am not nitpicking here!
>>>
>>>
>>> Cheers,
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>>
>


SAS_TO_SPARK_SQL_(Could be a Bug?)

2016-06-13 Thread Ajay Chander
Hi Mohit,

Thanks for your time. Please find my response below.

Did you try the same with another database?
I do load the data from MySQL and SQL Server the same way(through SPARK SQL
JDBC) which works perfectly alright.

As a workaround you can write the select statement yourself instead of just
providing the table name?
Yes I did that too. It did not made any difference.

Thank you,
Ajay

On Sunday, June 12, 2016, Mohit Jaggi  wrote:

> Looks like a bug in the code generating the SQL query…why would it be
> specific to SAS, I can’t guess. Did you try the same with another database?
> As a workaround you can write the select statement yourself instead of just
> providing the table name.
>
> On Jun 11, 2016, at 6:27 PM, Ajay Chander  wrote:
>
> I tried implementing the same functionality through Scala as well. But no
> luck so far. Just wondering if anyone here tried using Spark SQL to read
> SAS dataset? Thank you
>
> Regards,
> Ajay
>
> On Friday, June 10, 2016, Ajay Chander  wrote:
>
>> Mich, I completely agree with you. I built another Spark SQL application
>> which reads data from MySQL and SQL server and writes the data into
>> Hive(parquet+snappy format). I have this problem only when I read directly
>> from remote SAS system. The interesting part is I am using same driver to
>> read data through pure Java app and spark app. It works fine in Java
>> app, so I cannot blame SAS driver here. Trying to understand where the
>> problem could be. Thanks for sharing this with me.
>>
>> On Friday, June 10, 2016, Mich Talebzadeh 
>> wrote:
>>
>>> I personally use Scala to do something similar. For example here I
>>> extract data from an Oracle table and store in ORC table in Hive. This is
>>> compiled via sbt as run with SparkSubmit.
>>>
>>> It is similar to your code but in Scala. Note that I do not enclose my
>>> column names in double quotes.
>>>
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.sql.types._
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.spark.sql.functions._
>>>
>>> object ETL_scratchpad_dummy {
>>>   def main(args: Array[String]) {
>>>   val conf = new SparkConf().
>>>setAppName("ETL_scratchpad_dummy").
>>>set("spark.driver.allowMultipleContexts", "true")
>>>   val sc = new SparkContext(conf)
>>>   // Create sqlContext based on HiveContext
>>>   val sqlContext = new HiveContext(sc)
>>>   import sqlContext.implicits._
>>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>   println ("\nStarted at"); sqlContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>>   HiveContext.sql("use oraclehadoop")
>>>   var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb12"
>>>   var _username : String = "scratchpad"
>>>   var _password : String = ""
>>>
>>>   // Get data from Oracle table scratchpad.dummy
>>>   val d = HiveContext.load("jdbc",
>>>   Map("url" -> _ORACLEserver,
>>>   "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
>>> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
>>> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>>>   "user" -> _username,
>>>   "password" -> _password))
>>>
>>>d.registerTempTable("tmp")
>>>   //
>>>   // Need to create and populate target ORC table oraclehadoop.dummy
>>>   //
>>>   HiveContext.sql("use oraclehadoop")
>>>   //
>>>   // Drop and create table dummy
>>>   //
>>>   HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.dummy")
>>>   var sqltext : String = ""
>>>   sqltext = """
>>>   CREATE TABLE oraclehadoop.dummy (
>>>  ID INT
>>>, CLUSTERED INT
>>>, SCATTERED INT
>>>, RANDOMISED INT
>>>, RANDOM_STRING VARCHAR(50)
>>>, SMALL_VC VARCHAR(10)
>>>, PADDING  VARCHAR(10)
>>>   )
>>>   CLUSTERED BY (ID) INTO 256 BUCKETS
>>>   STORED AS ORC
>>>   TBLPROPERTIES (
>>>   "orc.create.index"="true",
>>>   "orc.bloom.filter.columns"="ID",
>>>   "orc.bloom.filter.fpp"="0.05",
>>>   "orc.compress"="SNAPPY",
>>>   "orc.stripe.size"="16777216",
>>>   "orc.row.index.stride"="1" )
>>>   """
>>>HiveContext.sql(sqltext)
>>>   //
>>>   // Put data in Hive table. Clean up is already done
>>>   //
>>>   sqltext = """
>>>   INSERT INTO TABLE oraclehadoop.dummy
>>>   SELECT
>>>   ID
>>> , CLUSTERED
>>> , SCATTERED
>>> , RANDOMISED
>>> , RANDOM_STRING
>>> , SMALL_VC
>>> , PADDING
>>>   FROM tmp
>>>   """
>>>HiveContext.sql(sqltext)
>>>   println ("\nFinished at"); sqlContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>>   sys.exit()
>>>  }
>>> }
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn 

Re: Basic question. Access MongoDB data in Spark.

2016-06-13 Thread Prajwal Tuladhar
May be try opening an issue in their GH repo
https://github.com/Stratio/Spark-MongoDB

On Mon, Jun 13, 2016 at 4:10 PM, Umair Janjua 
wrote:

> Anybody knows the stratio's mailing list? I cant seem to find it. Cheers
>
> On Mon, Jun 13, 2016 at 6:02 PM, Ted Yu  wrote:
>
>> Have you considered posting the question on stratio's mailing list ?
>>
>> You may get faster response there.
>>
>>
>> On Mon, Jun 13, 2016 at 8:09 AM, Umair Janjua 
>> wrote:
>>
>>> Hi guys,
>>>
>>> I have this super basic problem which I cannot figure out. Can somebody
>>> give me a hint.
>>>
>>> http://stackoverflow.com/questions/37793214/spark-mongdb-data-using-java
>>>
>>> Cheers
>>>
>>
>>
>


-- 
--
Cheers,
Praj


Re: Basic question. Access MongoDB data in Spark.

2016-06-13 Thread Umair Janjua
Anybody knows the stratio's mailing list? I cant seem to find it. Cheers

On Mon, Jun 13, 2016 at 6:02 PM, Ted Yu  wrote:

> Have you considered posting the question on stratio's mailing list ?
>
> You may get faster response there.
>
>
> On Mon, Jun 13, 2016 at 8:09 AM, Umair Janjua 
> wrote:
>
>> Hi guys,
>>
>> I have this super basic problem which I cannot figure out. Can somebody
>> give me a hint.
>>
>> http://stackoverflow.com/questions/37793214/spark-mongdb-data-using-java
>>
>> Cheers
>>
>
>


Re: Basic question. Access MongoDB data in Spark.

2016-06-13 Thread Ted Yu
Have you considered posting the question on stratio's mailing list ?

You may get faster response there.


On Mon, Jun 13, 2016 at 8:09 AM, Umair Janjua 
wrote:

> Hi guys,
>
> I have this super basic problem which I cannot figure out. Can somebody
> give me a hint.
>
> http://stackoverflow.com/questions/37793214/spark-mongdb-data-using-java
>
> Cheers
>


Re: Kafka Exceptions

2016-06-13 Thread Cody Koeninger
Is the exception on the driver or the executor?

To be clear, you're going to see a task fail if a partition changes
leader while the task is running, regardless of configuration
settings.  The task should be retried up the maxFailures though.

What are maxRetries and maxFailures set to?   How long does the leader
change take?  How many task retries happen during that time (assuming
it's an exception on the executor)?

On Mon, Jun 13, 2016 at 10:40 AM, Bryan Jeffrey  wrote:
> Cody,
>
> We already set the maxRetries.  We're still seeing issue - when leader is
> shifted, for example, it does not appear that direct stream reader correctly
> handles this.  We're running 1.6.1.
>
> Bryan Jeffrey
>
> On Mon, Jun 13, 2016 at 10:37 AM, Cody Koeninger  wrote:
>>
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> spark.streaming.kafka.maxRetries
>>
>> spark.task.maxFailures
>>
>> On Mon, Jun 13, 2016 at 8:25 AM, Bryan Jeffrey 
>> wrote:
>> > All,
>> >
>> > We're running a Spark job that is consuming data from a large Kafka
>> > cluster
>> > using the Direct Stream receiver.  We're seeing intermittent
>> > NotLeaderForPartitionExceptions when the leader is moved to another
>> > broker.
>> > Currently even with retry enabled we're seeing the job fail at this
>> > exception.  Is there a configuration setting I am missing?  How are
>> > these
>> > issues typically handled?
>> >
>> > User class threw exception: org.apache.spark.SparkException:
>> > ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>> > org.apache.spark.SparkException: Couldn't find leader offsets for
>> > Set([MyTopic,43]))
>> >
>> > Thank you,
>> >
>> > Bryan Jeffrey
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Exceptions

2016-06-13 Thread Bryan Jeffrey
Cody,

We already set the maxRetries.  We're still seeing issue - when leader is
shifted, for example, it does not appear that direct stream reader
correctly handles this.  We're running 1.6.1.

Bryan Jeffrey

On Mon, Jun 13, 2016 at 10:37 AM, Cody Koeninger  wrote:

> http://spark.apache.org/docs/latest/configuration.html
>
> spark.streaming.kafka.maxRetries
>
> spark.task.maxFailures
>
> On Mon, Jun 13, 2016 at 8:25 AM, Bryan Jeffrey 
> wrote:
> > All,
> >
> > We're running a Spark job that is consuming data from a large Kafka
> cluster
> > using the Direct Stream receiver.  We're seeing intermittent
> > NotLeaderForPartitionExceptions when the leader is moved to another
> broker.
> > Currently even with retry enabled we're seeing the job fail at this
> > exception.  Is there a configuration setting I am missing?  How are these
> > issues typically handled?
> >
> > User class threw exception: org.apache.spark.SparkException:
> > ArrayBuffer(kafka.common.NotLeaderForPartitionException,
> > org.apache.spark.SparkException: Couldn't find leader offsets for
> > Set([MyTopic,43]))
> >
> > Thank you,
> >
> > Bryan Jeffrey
> >
>


Basic question. Access MongoDB data in Spark.

2016-06-13 Thread Umair Janjua
Hi guys,

I have this super basic problem which I cannot figure out. Can somebody
give me a hint.

http://stackoverflow.com/questions/37793214/spark-mongdb-data-using-java

Cheers


Re: Kafka Exceptions

2016-06-13 Thread Cody Koeninger
http://spark.apache.org/docs/latest/configuration.html

spark.streaming.kafka.maxRetries

spark.task.maxFailures

On Mon, Jun 13, 2016 at 8:25 AM, Bryan Jeffrey  wrote:
> All,
>
> We're running a Spark job that is consuming data from a large Kafka cluster
> using the Direct Stream receiver.  We're seeing intermittent
> NotLeaderForPartitionExceptions when the leader is moved to another broker.
> Currently even with retry enabled we're seeing the job fail at this
> exception.  Is there a configuration setting I am missing?  How are these
> issues typically handled?
>
> User class threw exception: org.apache.spark.SparkException:
> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([MyTopic,43]))
>
> Thank you,
>
> Bryan Jeffrey
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kafka Exceptions

2016-06-13 Thread Bryan Jeffrey
All,

We're running a Spark job that is consuming data from a large Kafka cluster
using the Direct Stream receiver.  We're seeing intermittent
NotLeaderForPartitionExceptions when the leader is moved to another
broker.  Currently even with retry enabled we're seeing the job fail at
this exception.  Is there a configuration setting I am missing?  How are
these issues typically handled?

User class threw exception: org.apache.spark.SparkException:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([MyTopic,43]))

Thank you,

Bryan Jeffrey


Re: OutOfMemoryError - When saving Word2Vec

2016-06-13 Thread sharad82
Is this the right forum to post Spark related issues ? I have tried this
forum along with StackOverflow but not seeing any response.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-When-saving-Word2Vec-tp27142p27151.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming application failing with Kerboros issue while writing data to HBase

2016-06-13 Thread Kamesh
Hi All,
 We are building a spark streaming application and that application writes
data to HBase table. But writes/reads are failing with following exception

16/06/13 04:35:16 ERROR ipc.AbstractRpcClient: SASL authentication failed.
The most likely cause is missing or invalid credentials. Consider 'kinit'.

javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]

at
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)

at
org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)

This application is failing at Executor machine. Executor is not able to
pass the token. Can someone help me how to resolve this issue.

*Environment Details*
Spark Version : 1.6.1
HBase Version : 1.0.0
Hadoop Version : 2.6.0

--
Thanks & Regards
Kamesh.


Spark 2.0: Unify DataFrames and Datasets question

2016-06-13 Thread Arun Patel
In Spark 2.0, DataFrames and Datasets are unified. DataFrame is simply an
alias for a Dataset of type row.   I have few questions.

1) What does this really mean to an Application developer?
2) Why this unification was needed in Spark 2.0?
3) What changes can be observed in Spark 2.0 vs Spark 1.6?
4) Compile time safety will be there for DataFrames too?
5) Python API is supported for Datasets in 2.0?

Thanks
Arun


Re: Spark Streamming checkpoint and restoring files from S3

2016-06-13 Thread Natu Lauchande
Hi,

It seems to me that the checkpoint command is not persisting the
SparkContext hadoop configuration correctly . Can this be a possibility ?



Thanks,
Natu

On Mon, Jun 13, 2016 at 11:57 AM, Natu Lauchande 
wrote:

> Hi,
>
> I am testing disaster recovery from Spark and having some issues when
> trying to restore an input file from s3 :
>
> 2016-06-13 11:42:52,420 [main] INFO
> org.apache.spark.streaming.dstream.FileInputDStream$FileInputDStreamCheckpointData
> - Restoring files for time 146581086 ms -
> [s3n://bucketfoo/filefoo/908966c353654a21bc7b2733d65b7c19_availability_146390040.csv]
> Exception in thread "main" java.lang.IllegalArgumentException: AWS Access
> Key ID and Secret Access Key must be specified as the username or password
> (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or
> fs.s3n.awsSecretAccessKey properties (respectively).
>
>
> I am basically following the pattern
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
> . And added on the stream creation the environment variables.
>
> Can anyone in the list help me on figuring out why i am having this error ?
>
> Thanks,
> Natu
>
>
>


Spark Streamming checkpoint and restoring files from S3

2016-06-13 Thread Natu Lauchande
Hi,

I am testing disaster recovery from Spark and having some issues when
trying to restore an input file from s3 :

2016-06-13 11:42:52,420 [main] INFO
org.apache.spark.streaming.dstream.FileInputDStream$FileInputDStreamCheckpointData
- Restoring files for time 146581086 ms -
[s3n://bucketfoo/filefoo/908966c353654a21bc7b2733d65b7c19_availability_146390040.csv]
Exception in thread "main" java.lang.IllegalArgumentException: AWS Access
Key ID and Secret Access Key must be specified as the username or password
(respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or
fs.s3n.awsSecretAccessKey properties (respectively).


I am basically following the pattern
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
. And added on the stream creation the environment variables.

Can anyone in the list help me on figuring out why i am having this error ?

Thanks,
Natu


Re: Java MongoDB Spark Stratio (Please give me a hint)

2016-06-13 Thread Umair Janjua
The dataframe does not get any data. What could I be doing wrong here. I
have rechecked the credentials and other stuff. I am still trying to debug
the issue without any luck so far.

On Mon, Jun 13, 2016 at 11:30 AM, Umair Janjua 
wrote:

> Any idea what I might be doing wrong. I am new to spark and I cannot
> proceed forward from here:
>
>
>
> ---
>  JavaSparkContext sc = new JavaSparkContext("local[*]", "test
> spark-mongodb java");
> SQLContext sqlContext = new SQLContext(sc);
>
> Map options = new HashMap();
> options.put("host", "host:port");
> options.put("database", "database");
> options.put("collection", "collectionName");
> options.put("credentials", "username,database,password");
>
> System.out.println("Check1");
>
> DataFrame df =
> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
>
> df.registerTempTable("collectionName");
> df.show();
> --
>
> On Fri, Jun 10, 2016 at 3:36 PM, Umair Janjua 
> wrote:
>
>> Hi my code,
>> When i run this program is gets stuck at
>> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
>> line and then it does not proceed forward. Nothing happens after that. What
>> should I do? How can I debug it. I am stuck here. Please any hint would be
>> appreciated.
>>
>>
>> -
>> JavaSparkContext sc = new JavaSparkContext("local[*]", "test
>> spark-mongodb java");
>> SQLContext sqlContext = new SQLContext(sc);
>>
>> Map options = new HashMap();
>> options.put("host", "host:port");
>> options.put("database", "database");
>> options.put("collection", "collectionName");
>> options.put("credentials", "username,database,password");
>>
>> System.out.println("Check1");
>> DataFrame df =
>> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
>>
>> sqlContext.sql("SELECT * FROM collectionName");
>> System.out.println("Check2");
>> df.count();
>> System.out.println("Check DataFrame Count: " + df.count());
>> System.out.println("Check3");
>> df.registerTempTable("collectionName");
>> df.show();
>>
>>
>> -
>>
>> The above code only gets printed till Check1 and then it gets stuck and
>> nothing happens.
>>
>> Cheers
>>
>
>


Re: Java MongoDB Spark Stratio (Please give me a hint)

2016-06-13 Thread Umair Janjua
Any idea what I might be doing wrong. I am new to spark and I cannot
proceed forward from here:



---
 JavaSparkContext sc = new JavaSparkContext("local[*]", "test spark-mongodb
java");
SQLContext sqlContext = new SQLContext(sc);

Map options = new HashMap();
options.put("host", "host:port");
options.put("database", "database");
options.put("collection", "collectionName");
options.put("credentials", "username,database,password");

System.out.println("Check1");

DataFrame df =
sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();

df.registerTempTable("collectionName");
df.show();
--

On Fri, Jun 10, 2016 at 3:36 PM, Umair Janjua 
wrote:

> Hi my code,
> When i run this program is gets stuck at
> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
> line and then it does not proceed forward. Nothing happens after that. What
> should I do? How can I debug it. I am stuck here. Please any hint would be
> appreciated.
>
>
> -
> JavaSparkContext sc = new JavaSparkContext("local[*]", "test
> spark-mongodb java");
> SQLContext sqlContext = new SQLContext(sc);
>
> Map options = new HashMap();
> options.put("host", "host:port");
> options.put("database", "database");
> options.put("collection", "collectionName");
> options.put("credentials", "username,database,password");
>
> System.out.println("Check1");
> DataFrame df =
> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
>
> sqlContext.sql("SELECT * FROM collectionName");
> System.out.println("Check2");
> df.count();
> System.out.println("Check DataFrame Count: " + df.count());
> System.out.println("Check3");
> df.registerTempTable("collectionName");
> df.show();
>
>
> -
>
> The above code only gets printed till Check1 and then it gets stuck and
> nothing happens.
>
> Cheers
>


Re: Spark Getting data from MongoDB in JAVA

2016-06-13 Thread Asfandyar Ashraf Malik
Yes, It was a dependency issue. I was using incompatible versions.
The command *mvn dependency:tree -Dverbose *helped me fix this.

Cheers



Asfandyar Ashraf Malik


Mobile: +49 15751174449 <%2B49%20151%20230%20130%2066>

Email: asfand...@kreditech.com <%2B49%20151%20230%20130%2066>



Kreditech Holding SSL GmbH

Ludwig-Erhard-Straße 1, 20459 Hamburg, Germany

2016-06-12 18:36 GMT+02:00 Ted Yu :

> What's the value of spark.version ?
>
> Do you know which version of Spark mongodb connector 0.10.3 was built
> against ?
>
> You can use the following command to find out:
> mvn dependency:tree
>
> Maybe the Spark version you use is different from what mongodb connector
> was built against.
>
> On Fri, Jun 10, 2016 at 2:50 AM, Asfandyar Ashraf Malik <
> asfand...@kreditech.com> wrote:
>
>> Hi,
>> I did not notice that I put it twice.
>> I changed that and ran my program but it still gives the same error:
>>
>> java.lang.NoSuchMethodError:
>> org.apache.spark.sql.catalyst.ScalaReflection$.typeOfObject()Lscala/PartialFunction;
>>
>>
>> Cheers
>>
>>
>>
>> 2016-06-10 11:47 GMT+02:00 Alonso Isidoro Roman :
>>
>>> why *spark-mongodb_2.11 dependency is written twice in pom.xml?*
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> 
>>>
>>> 2016-06-10 11:39 GMT+02:00 Asfandyar Ashraf Malik <
>>> asfand...@kreditech.com>:
>>>
 Hi,
 I am using Stratio library to get MongoDB to work with Spark but I get
 the following error:

 java.lang.NoSuchMethodError:
 org.apache.spark.sql.catalyst.ScalaReflection

 This is my code.

 ---
 *public static void main(String[] args) {*

 *JavaSparkContext sc = new JavaSparkContext("local[*]", "test
 spark-mongodb java"); *
 *SQLContext sqlContext = new SQLContext(sc); *

 *Map options = new HashMap(); *
 *options.put("host", "xyz.mongolab.com:59107
 "); *
 *options.put("database", "heroku_app3525385");*
 *options.put("collection", "datalog");*
 *options.put("credentials", "*,,");*

 *DataFrame df =
 sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();*
 *df.registerTempTable("datalog"); *
 *df.show();*

 *}*

 ---
 My pom file is as follows:

  **
 **
 *org.apache.spark*
 *spark-core_2.11*
 *${spark.version}*
 **
 **
 *org.apache.spark*
 *spark-catalyst_2.11 *
 *${spark.version}*
 **
 **
 *org.apache.spark*
 *spark-sql_2.11*
 *${spark.version}*
 * *
 **
 *com.stratio.datasource*
 *spark-mongodb_2.11*
 *0.10.3*
 **
 **
 *com.stratio.datasource*
 *spark-mongodb_2.11*
 *0.10.3*
 *jar*
 **
 **


 Regards

>>>
>>>
>>
>


cluster mode for Python on standalone cluster

2016-06-13 Thread Jan Šourek
The official documentation states 'Currently only YARN supports cluster
mode for Python applications.'
I would like to know if work is being done or planned to support cluster
mode for Python applications on standalone spark clusters?


cluster mode for Python on standalone clusters

2016-06-13 Thread Jan Sourek
The official documentation states 'Currently only YARN supports cluster
mode for Python applications.'
I would like to know if work is being done or planned to support cluster
mode for Python applications on standalone spark clusters?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cluster-mode-for-Python-on-standalone-clusters-tp27148.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Another problem about parallel computing

2016-06-13 Thread Terry Hoo
hero,

Did you check whether there is any exception after retry? If the port is 0,
the spark worker should bind to a random port. BTW, what's the spark
version?

Regards,

- Terry

On Mon, Jun 13, 2016 at 4:24 PM, hero  wrote:

> Hi, guys
>
> I have another problem about spark yarn.
> Today, i was running start-all.sh when i found only two worker in the Web
> Ui, and in fact, I have four nodes.
> The only display of the two nodes, one is master, one is slave2.
> the '/etc/hosts' file is show below:
>
> *127.0.0.1   localhost*
> *169.254.9.148   master*
> *169.254.142.119 s1*
> *169.254.180.3   s2*
> *169.254.250.67  s3*
>
>
> So, I'm going to see the spark log on slave1 and slave3.
> The slave2 log is show below:
>
> 16/06/13 03:38:00 INFO util.Utils: Successfully started service
> 'sparkWorker' on port 39887.
>
> And the slave1 or slave3 log is same:
>
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
>
> So, I think the problem lies in this util.Utils.
> I search through Google, but there is no answer to this problem.
> Now how to bind the sparkWorker port?
>
> Thanks
> Jay
>


Another problem about parallel computing

2016-06-13 Thread hero

Hi, guys

I have another problem about spark yarn.
Today, i was running start-all.sh when i found only two worker in the 
Web Ui, and in fact, I have four nodes.

The only display of the two nodes, one is master, one is slave2.
the '/etc/hosts' file is show below:

/127.0.0.1   localhost//
//169.254.9.148   master//
//169.254.142.119 s1//
//169.254.180.3   s2//
//169.254.250.67  s3/


So, I'm going to see the spark log on slave1 and slave3.
The slave2 log is show below:

16/06/13 03:38:00 INFO util.Utils: Successfully started service 
'sparkWorker' on port 39887.


And the slave1 or slave3 log is same:

16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.
16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind 
on port 0. Attempting port 1.


So, I think the problem lies in this util.Utils.
I search through Google, but there is no answer to this problem.
Now how to bind the sparkWorker port?

Thanks
Jay


Spark 2.0.0 : GLM problem

2016-06-13 Thread april_ZMQ
Hi ALL,

I’ve tried the GLM (General Linear Model) of Spark 2.0.0-preview. And I’ve
countered some unexpected problems.
•   First problem:
I test the “poisson” family type GLM with a very small dataset using SparkR
2.0.0 This dataset can run “poisson” family type GLM in general R
successfully. But SparkR showed the error below. And I have no idea where
this came from.

16/06/13 14:10:58 WARN WeightedLeastSquares: regParam is zero, which might
cause numerical instability and overfitting.
16/06/13 14:10:58 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID
28)
java.lang.IllegalArgumentException: requirement failed: The response
variable of Poisson family should be positive, but got 0.0
 

•   Second problem:
When I run the same dataset which I ran successfully on Spark 1.6.0, Spark
2.0.0 generated the error below.

ERROR RBackendHandler: fit on
org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper failed
Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
  org.apache.spark.SparkException: Currently, GeneralizedLinearRegression
only supports number of features <= 4096. Found 7664 in the input dataset.
 

This is the R code:
“model <- glm(flow~Origin + Destination, data = distance_flow,family =
gaussian(link = "identity"))”
Dose this because Spark 2.0.0 not support as large dataset as the previous
version?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-0-GLM-problem-tp27145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Should I avoid "state" in an Spark application?

2016-06-13 Thread Alonso Isidoro Roman
Hi Haopu, please check these threads:

http://stackoverflow.com/questions/24331815/spark-streaming-historical-state

https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/total.html

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2016-06-13 3:11 GMT+02:00 Haopu Wang :

> Can someone look at my questions? Thanks again!
>
>
> --
>
> *From:* Haopu Wang
> *Sent:* 2016年6月12日 16:40
> *To:* user@spark.apache.org
> *Subject:* Should I avoid "state" in an Spark application?
>
>
>
> I have a Spark application whose structure is below:
>
>
>
> var ts: Long = 0L
>
> dstream1.foreachRDD{
>
> (x, time) => {
>
> ts = time
>
> x.do_something()...
>
> }
>
> }
>
> ..
>
> process_data(dstream2, ts, ..)
>
>
>
> I assume foreachRDD function call can update "ts" variable which is then
> used in the Spark tasks of "process_data" function.
>
>
>
> From my test result of a standalone Spark cluster, it is working. But
> should I concern if switch to YARN?
>
>
>
> And I saw some articles are recommending to avoid state in Scala
> programming. Without the state variable, how could that be done?
>
>
>
> Any comments or suggestions are appreciated.
>
>
>
> Thanks,
>
> Haopu
>


Hive 1.0.0 not able to read Spark 1.6.1 parquet output files on EMR 4.7.0

2016-06-13 Thread mayankshete
Hello Team ,

I am facing an issue where output files generated by Spark 1.6.1 are not
read by Hive 1.0.0 . It is because Hive 1.0.0 uses older parquet version
than Spark 1.6.1 which is using 1.7.0 parquet .

Is it possible that we can use older parquet version in Spark or newer
parquet version in Hive ?
I have tried adding parquet-hive-bundle : 1.7.0 to Hive but while reading it
throws Failed with exception
java.io.IOException:java.lang.NullPointerException . 

Can anyone give us the solution ?

Thanks ,
Mayank



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-1-0-0-not-able-to-read-Spark-1-6-1-parquet-output-files-on-EMR-4-7-0-tp27144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Dataframe : Column features must be of type org.apache.spark.mllib.linalg.VectorUDT

2016-06-13 Thread Zakaria Hili
Hi,
I create a dataframe using a schema, but when I try to create a model, I
receive  this error:

requirement failed: Column features must be of type
org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually
ArrayType(StringType,true)



 piece of code 

SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

StructType schema = DataTypes
.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType,
false),
DataTypes.createStructField("date", DataTypes.StringType,
false),
DataTypes.createStructField("temperature",
DataTypes.StringType, true),);


// I receive data from another application like this id,date,temperature
JavaRDD rowsRdd = rdd.map(e ->
RowFactory.create(e.split(",")));

DataFrame df = sqlContext.createDataFrame(rowsRdd, schema);


LinearRegression lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8);

Tokenizer tokenizer = new
Tokenizer().setInputCol("temperature").setOutputCol("features");
// the problem is here :
DataFrame result = tokenizer.transform(df);

LinearRegressionModel lrModel = lr.fit(result);
##
And I don't know how can I do this and why I need label field if I tried to
transform column features into vector ?

thank you in advance
regards,
Zakaria
ᐧ


Re: StackOverflow in Spark

2016-06-13 Thread Terry Hoo
Maybe the same issue with SPARK_6847
, which has been fixed in
spark 2.0

Regards
- Terry

On Mon, Jun 13, 2016 at 3:15 PM, Michel Hubert  wrote:

>
>
> I’ve found my problem.
>
>
>
> I’ve got a DAG with two consecutive “updateStateByKey” functions .
>
> When I only process (map/foreachRDD/JavaEsSpark) the state of the last
> “updateStateByKey” function, I get an stackoverflow after a while (too long
> linage).
>
>
>
> But when I also do some processing (foreachRDD/rdd.take) on the first
> “updatestatebykey”, then there is no problem.
>
>
>
> Does this make sense? Probably the “long linage” problem.
>
>
>
> But why should I have such a “linage problem” when Sparks claims to be a
> “abstract/high level” architecture? Why should I be worried about “long
> linage”? Its seems a contradiction with the abstract/high level (functional
> programming) approach when I have to know/consider how Spark doest it.
>
>
>
>
>
>
>
> *Van:* Rishabh Wadhawan [mailto:rishabh...@gmail.com]
> *Verzonden:* donderdag 2 juni 2016 06:06
> *Aan:* Yash Sharma 
> *CC:* Ted Yu ; Matthew Young ;
> Michel Hubert ; user@spark.apache.org
> *Onderwerp:* Re: StackOverflow in Spark
>
>
>
> Stackoverflow is generated when DAG is too log as there are many
> transformations in lot of iterations. Please use checkpointing to store the
> DAG and break the linage to get away from this stack overflow error. Look
> into checkpoint fuction.
>
> Thanks
>
> Hope it helps. Let me know if you need anymore help.
>
> On Jun 1, 2016, at 8:18 PM, Yash Sharma  wrote:
>
>
>
> Not sure if its related, But I got a similar stack overflow error some
> time back while reading files and converting them to parquet.
>
>
>
>
>
>
> Stack trace-
> 16/06/02 02:23:54 INFO YarnAllocator: Driver requested a total number of
> 32769 executor(s).
> 16/06/02 02:23:54 INFO ExecutorAllocationManager: Requesting 16384 new
> executors because tasks are backlogged (new desired total will be 32769)
> 16/06/02 02:23:54 INFO YarnAllocator: Will request 24576 executor
> containers, each with 5 cores and 22528 MB memory including 2048 MB overhead
> 16/06/02 02:23:55 WARN ApplicationMaster: Reporter thread fails 2 time(s)
> in a row.
> at
> scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
> at
> scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:24)
> at
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>
> java.lang.StackOverflowError
>
>
>
>
>
> On Thu, Jun 2, 2016 at 12:58 PM, Ted Yu  wrote:
>
> Looking at Michel's stack trace, it seems to be different issue.
>
>
> On Jun 1, 2016, at 7:45 PM, Matthew Young  wrote:
>
> Hi,
>
>
>
> It's related to the one fixed bug in Spark, jira ticket SPARK-6847
> 
>
>
>
> Matthew Yang
>
>
>
> On Wed, May 25, 2016 at 7:48 PM, Michel Hubert  wrote:
>
>
>
> Hi,
>
>
>
>
>
> I have an Spark application which generates StackOverflowError exceptions
> after 30+ min.
>
>
>
> Anyone any ideas?
>
>
>
>
>
>
>
>
>
>
>
>
>
> 16/05/25 10:48:51 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 55449.0 (TID 5584, host81440-cld.opentsp.com):
> java.lang.StackOverflowError
>
> ·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
>
> ·at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>
> ·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>
> ·at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>
> ·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>
> ·at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>
> ·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>
> ·at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>
> ·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>
> ·at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
> ·at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>
> ·at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>
> ·at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> ·at 

RE: StackOverflow in Spark

2016-06-13 Thread Michel Hubert

I’ve found my problem.

I’ve got a DAG with two consecutive “updateStateByKey” functions .
When I only process (map/foreachRDD/JavaEsSpark) the state of the last 
“updateStateByKey” function, I get an stackoverflow after a while (too long 
linage).

But when I also do some processing (foreachRDD/rdd.take) on the first 
“updatestatebykey”, then there is no problem.

Does this make sense? Probably the “long linage” problem.

But why should I have such a “linage problem” when Sparks claims to be a 
“abstract/high level” architecture? Why should I be worried about “long 
linage”? Its seems a contradiction with the abstract/high level (functional 
programming) approach when I have to know/consider how Spark doest it.



Van: Rishabh Wadhawan [mailto:rishabh...@gmail.com]
Verzonden: donderdag 2 juni 2016 06:06
Aan: Yash Sharma 
CC: Ted Yu ; Matthew Young ; Michel 
Hubert ; user@spark.apache.org
Onderwerp: Re: StackOverflow in Spark

Stackoverflow is generated when DAG is too log as there are many 
transformations in lot of iterations. Please use checkpointing to store the DAG 
and break the linage to get away from this stack overflow error. Look into 
checkpoint fuction.
Thanks
Hope it helps. Let me know if you need anymore help.
On Jun 1, 2016, at 8:18 PM, Yash Sharma 
> wrote:

Not sure if its related, But I got a similar stack overflow error some time 
back while reading files and converting them to parquet.



Stack trace-
16/06/02 02:23:54 INFO YarnAllocator: Driver requested a total number of 32769 
executor(s).
16/06/02 02:23:54 INFO ExecutorAllocationManager: Requesting 16384 new 
executors because tasks are backlogged (new desired total will be 32769)
16/06/02 02:23:54 INFO YarnAllocator: Will request 24576 executor containers, 
each with 5 cores and 22528 MB memory including 2048 MB overhead
16/06/02 02:23:55 WARN ApplicationMaster: Reporter thread fails 2 time(s) in a 
row.
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:24)
at 
scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
java.lang.StackOverflowError


On Thu, Jun 2, 2016 at 12:58 PM, Ted Yu 
> wrote:
Looking at Michel's stack trace, it seems to be different issue.

On Jun 1, 2016, at 7:45 PM, Matthew Young 
> wrote:
Hi,

It's related to the one fixed bug in Spark, jira ticket 
SPARK-6847

Matthew Yang

On Wed, May 25, 2016 at 7:48 PM, Michel Hubert 
> wrote:

Hi,


I have an Spark application which generates StackOverflowError exceptions after 
30+ min.

Anyone any ideas?






16/05/25 10:48:51 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 55449.0 
(TID 5584, host81440-cld.opentsp.com): 
java.lang.StackOverflowError
·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
·at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
·at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
·at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
·at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
·at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
·at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
·at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
·at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
·at java.lang.reflect.Method.invoke(Method.java:606)
·at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
·at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
·at