Re: Error from reading S3 in Scala

2016-05-04 Thread Zhang, Jingyu
Thanks everyone,

One reason to use "s3a//" is because  I use "s3a//" in my development env
(Eclipse) on a desktop. I will debug and test on my desktop then put jar
file on EMR Cluster. I do not think "s3//" will works on a desktop.

With helping from AWS suport, this bug is cause by the version of Joda-Time
in my pom file is not match with aws-SDK.jar because AWS authentication
requires a valid Date or x-amz-date header. It will work after update to
joda-time 2.8.1, aws SDK 1.10.x and amazon-hadoop 2.6.1.

But, it will shown exception on amazon-hadoop 2.7.2. The reason for
using amazon-hadoop
2.7.2 is because in EMR 4.6.0 the supported version are Hadoop 2.7.2, Spark
1.6.1.

Please let me know if you have a better idea to set up the development
environment for debug and test.

Regards,

Jingyu





On 4 May 2016 at 20:32, James Hammerton <ja...@gluru.co> wrote:

>
>
> On 3 May 2016 at 17:22, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> The best thing to do is start the EMR clusters with proper permissions in
>> the roles that way you do not need to worry about the keys at all.
>>
>> Another thing, why are we using s3a// instead of s3:// ?
>>
>
> Probably because of what's said about s3:// and s3n:// here (which is why
> I use s3a://):
>
> https://wiki.apache.org/hadoop/AmazonS3
>
> Regards,
>
> James
>
>
>> Besides that you can increase s3 speeds using the instructions mentioned
>> here:
>> https://aws.amazon.com/blogs/aws/aws-storage-update-amazon-s3-transfer-acceleration-larger-snowballs-in-more-regions/
>>
>>
>> Regards,
>> Gourav
>>
>> On Tue, May 3, 2016 at 12:04 PM, Steve Loughran <ste...@hortonworks.com>
>> wrote:
>>
>>> don't put your secret in the URI, it'll only creep out in the logs.
>>>
>>> Use the specific properties coverd in
>>> http://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html,
>>> which you can set in your spark context by prefixing them with spark.hadoop.
>>>
>>> you can also set the env vars, AWS_ACCESS_KEY_ID and
>>> AWS_SECRET_ACCESS_KEY; SparkEnv will pick these up and set the relevant
>>> spark context keys for you
>>>
>>>
>>> On 3 May 2016, at 01:53, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote:
>>>
>>> Hi All,
>>>
>>> I am using Eclipse with Maven for developing Spark applications. I got a
>>> error for Reading from S3 in Scala but it works fine in Java when I run
>>> them in the same project in Eclipse. The Scala/Java code and the error in
>>> following
>>>
>>>
>>> Scala
>>>
>>> val uri = URI.create("s3a://" + key + ":" + seckey + "@" +
>>> "graphclustering/config.properties");
>>> val pt = new Path("s3a://" + key + ":" + seckey + "@" +
>>> "graphclustering/config.properties");
>>> val fs = FileSystem.get(uri,ctx.hadoopConfiguration);
>>> val  inputStream:InputStream = fs.open(pt);
>>>
>>> Exception: on aws-java-1.7.4 and hadoop-aws-2.6.1
>>>
>>> Exception in thread "main"
>>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
>>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
>>> 8A56DC7BF0BFF09A), S3 Extended Request ID
>>>
>>> at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(
>>> AmazonHttpClient.java:1160)
>>>
>>> at com.amazonaws.http.AmazonHttpClient.executeOneRequest(
>>> AmazonHttpClient.java:748)
>>>
>>> at com.amazonaws.http.AmazonHttpClient.executeHelper(
>>> AmazonHttpClient.java:467)
>>>
>>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302
>>> )
>>>
>>> at com.amazonaws.services.s3.AmazonS3Client.invoke(
>>> AmazonS3Client.java:3785)
>>>
>>> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
>>> AmazonS3Client.java:1050)
>>>
>>> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
>>> AmazonS3Client.java:1027)
>>>
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(
>>> S3AFileSystem.java:688)
>>>
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:222)
>>>
>>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
>>>
>>> at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:53)
>>>
&g

Error from reading S3 in Scala

2016-05-02 Thread Zhang, Jingyu
Hi All,

I am using Eclipse with Maven for developing Spark applications. I got a
error for Reading from S3 in Scala but it works fine in Java when I run
them in the same project in Eclipse. The Scala/Java code and the error in
following


Scala

val uri = URI.create("s3a://" + key + ":" + seckey + "@" +
"graphclustering/config.properties");
val pt = new Path("s3a://" + key + ":" + seckey + "@" +
"graphclustering/config.properties");
val fs = FileSystem.get(uri,ctx.hadoopConfiguration);
val  inputStream:InputStream = fs.open(pt);

Exception: on aws-java-1.7.4 and hadoop-aws-2.6.1

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception:
Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden;
Request ID: 8A56DC7BF0BFF09A), S3 Extended Request ID

at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(
AmazonHttpClient.java:1160)

at com.amazonaws.http.AmazonHttpClient.executeOneRequest(
AmazonHttpClient.java:748)

at com.amazonaws.http.AmazonHttpClient.executeHelper(
AmazonHttpClient.java:467)

at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)

at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)

at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
AmazonS3Client.java:1050)

at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
AmazonS3Client.java:1027)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(
S3AFileSystem.java:688)

at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:222)

at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)

at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:53)

at com.news.report.graph.GraphCluster.main(GraphCluster.scala)

16/05/03 10:49:17 INFO SparkContext: Invoking stop() from shutdown hook

16/05/03 10:49:17 INFO SparkUI: Stopped Spark web UI at
http://10.65.80.125:4040

16/05/03 10:49:17 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!

16/05/03 10:49:17 INFO MemoryStore: MemoryStore cleared

16/05/03 10:49:17 INFO BlockManager: BlockManager stopped

Exception: on aws-java-1.7.4 and hadoop-aws-2.7.2

16/05/03 10:23:40 INFO Slf4jLogger: Slf4jLogger started

16/05/03 10:23:40 INFO Remoting: Starting remoting

16/05/03 10:23:40 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@10.65.80.125:61860]

16/05/03 10:23:40 INFO Utils: Successfully started service
'sparkDriverActorSystem' on port 61860.

16/05/03 10:23:40 INFO SparkEnv: Registering MapOutputTracker

16/05/03 10:23:40 INFO SparkEnv: Registering BlockManagerMaster

16/05/03 10:23:40 INFO DiskBlockManager: Created local directory at
/private/var/folders/sc/tdmkbvr1705b8p70xqj1kqks5l9p

16/05/03 10:23:40 INFO MemoryStore: MemoryStore started with capacity
1140.4 MB

16/05/03 10:23:40 INFO SparkEnv: Registering OutputCommitCoordinator

16/05/03 10:23:40 INFO Utils: Successfully started service 'SparkUI' on
port 4040.

16/05/03 10:23:40 INFO SparkUI: Started SparkUI at http://10.65.80.125:4040

16/05/03 10:23:40 INFO Executor: Starting executor ID driver on host
localhost

16/05/03 10:23:40 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 61861.

16/05/03 10:23:40 INFO NettyBlockTransferService: Server created on 61861

16/05/03 10:23:40 INFO BlockManagerMaster: Trying to register BlockManager

16/05/03 10:23:40 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:61861 with 1140.4 MB RAM, BlockManagerId(driver,
localhost, 61861)

16/05/03 10:23:40 INFO BlockManagerMaster: Registered BlockManager

Exception in thread "main" java.lang.NoSuchMethodError:
com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V

at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:285)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:52)

at com.news.report.graph.GraphCluster.main(GraphCluster.scala)

16/05/03 10:23:51 INFO SparkContext: Invoking stop() from shutdown hook

16/05/03 10:23:51 INFO SparkUI: Stopped Spark web UI at
http://10.65.80.125:4040

16/05/03 10:23:51 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!

16/05/03 10:23:51 INFO MemoryStore: MemoryStore cleared

16/05/03 10:23:51 INFO BlockManager: BlockManager stopped

16/05/03 10:23:51 INFO BlockManagerMaster: BlockManagerMaster stopped

16/05/03 10:23:51 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!

16/05/03 10:23:51 INFO SparkContext: Successfully stopped SparkContext


Re: Scala vs Python for Spark ecosystem

2016-04-20 Thread Zhang, Jingyu
Graphx did not support Python yet.
http://spark.apache.org/docs/latest/graphx-programming-guide.html

The workaround solution is use graphframes (3rd party API),
https://issues.apache.org/jira/browse/SPARK-3789

but some features in Python are not as same as Scala,
https://github.com/graphframes/graphframes/issues/57

Jingyu

On 20 April 2016 at 16:52, sujeet jog  wrote:

> It depends on the trade off's you wish to have,
>
> Python being a interpreted language, speed of execution will be lesser,
> but it being a very common language used across, people can jump in hands
> on quickly
>
> Scala programs run in java environment,  so it's obvious you will get good
> execution speed,  although it's not common for people to know this language
> readily.
>
>
> Pyspark API's i believe will have everything which Scala Spark API's offer
> in long run.
>
>
>
> On Wed, Apr 20, 2016 at 12:14 PM, berkerkozan 
> wrote:
>
>> I know scala better than python but my team (2 other my friend) knows only
>> python. We want to use graphx or maybe try graphframes.
>> What will be the future of these 2 languages for spark ecosystem? Will
>> python cover everything scala can in short time periods? what do you
>> advice?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-for-Spark-ecosystem-tp26805.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


How many threads will be used to read RDBMS after set numPartitions =10 in Spark JDBC

2016-04-04 Thread Zhang, Jingyu
Hi All,

I want read Mysql from Spark. Please let me know how many threads will be
used to read the RDBMS after set numPartitions =10 in Spark JDBC. What is
the best practice to read large dataset from RDBMS to Spark?

Thanks,

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: Memory issues on spark

2016-02-17 Thread Zhang, Jingyu
May set "maximizeResourceAllocation", then EMR will do the best config for
you.
http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-spark-configure.html

Jingyu

On 18 February 2016 at 12:02,  wrote:

> Hi All,
>
> I have been facing memory issues in spark. im using spark-sql on AWS EMR.
> i have around 50GB file in AWS S3. I want to read this file in BI tool
> connected to spark-sql on thrift-server over OBDC. I'm executing select *
> from table in BI tool(qlikview,tableau).
> I run into OOM error sometimes and some time the LOST_EXECUTOR. I'm really
> confused.
> The spark runs fine for smaller data set.
>
> I have 3 node EMR cluster with m3.2xlarge.
>
> I have set below conf on spark.
>
> export SPARK_EXECUTOR_INSTANCES=16
> export SPARK_EXECUTOR_CORES=16
> export SPARK_EXECUTOR_MEMORY=15G
> export SPARK_DRIVER_MEMORY=12G
> spark.kryoserializer.buffer.max 1024m
>
> Even after setting SPARK_EXECUTOR_INSTANCES as 16, only 2 executors come
> up.
>
> This is been road block since long time. Any help would be appreciated.
>
> Thanks
> Arun.
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. If you are not the intended recipient(s), please reply to the
> sender and destroy all copies of the original message. Any unauthorized
> review, use, disclosure, dissemination, forwarding, printing or copying of
> this email, and/or any action taken in reliance on the contents of this
> e-mail is strictly prohibited and may be unlawful. Where permitted by
> applicable law, this e-mail and other e-mail communications sent to and
> from Cognizant e-mail addresses may be monitored.
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Failed to remove broadcast 2 with removeFromMaster = true in Graphx

2016-02-05 Thread Zhang, Jingyu
I running a Pregel function with 37 nodes in EMR hadoop. After a hour
logs show following. Can anyone please tell what the problem is and
how do I solve it? Thanks

16/02/05 14:02:46 WARN BlockManagerMaster: Failed to remove broadcast
2 with removeFromMaster = true - Cannot receive any reply in 120
seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in
120 seconds. This timeout is controlled by spark.rpc.askTimeout
at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at 
scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at 
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634)
at 
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any
reply in 120 seconds
at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242)
... 7 more
16/02/05 14:02:46 ERROR ContextCleaner: Error cleaning broadcast 2
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in
120 seconds. This timeout is controlled by spark.rpc.askTimeout
at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)

Unpersist RDD in Graphx

2016-01-31 Thread Zhang, Jingyu
Hi, What is he best way to unpersist the RDD in graphx to release memory?
RDD.unpersist
or
RDD.unpersistVertices and RDD..edges.unpersist

I study the source code of Pregel.scala, Both of above were used between
line 148 and line 150. Can anyone please tell me what the different? In
addition, what is the difference to set blocking = false and blocking =
true?

oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)

Thanks,

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


How to filter the isolated vertexes in Graphx

2016-01-28 Thread Zhang, Jingyu
I try to filter vertexes that did not have any connection links with
others. How to filter those isolated vertexes in Graphx?

Thanks,

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Graphx: How to print the group of connected components one by one

2015-12-01 Thread Zhang, Jingyu
Can anyone please let me know How to print all nodes in connected
components one by one?

graph.connectedComponents()

e.g.

connected Component ID  Nodes ID

1  1,2,3

6   6,7,8,9


Thanks

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Size exceeds Integer.MAX_VALUE on EMR 4.0.0 Spark 1.4.1

2015-11-16 Thread Zhang, Jingyu
I am using spark-csv to save files in s3, it shown Size exceeds.
Please let me know how to fix it. Thanks.

df.write()
.format("com.databricks.spark.csv")
.option("header", "true")
.save("s3://newcars.csv");

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
at 
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
at 
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: Size exceeds Integer.MAX_VALUE (SparkSQL$TreeNodeException: sort, tree) on EMR 4.0.0 Spark 1.4.1

2015-11-16 Thread Zhang, Jingyu
 false, [net_site#530,device#449],
[net_site#530,device#449,Coalesce(SUM(PartialCount#717L),0) AS
unique_nk_count#109L]
Exchange (HashPartitioning 200)
 Aggregate true, [net_site#530,device#449],
[net_site#530,device#449,COUNT(device#449) AS PartialCount#717L]
  Project [net_site#530,device#449]
   Filter (cnt#108L = 1)
Aggregate false, [net_site#530,device#449,news_key#459],
[net_site#530,device#449,news_key#459,Coalesce(SUM(PartialCount#719L),0)
AS cnt#108L]
 Exchange (HashPartitioning 200)
  Aggregate true, [net_site#530,device#449,news_key#459],
[net_site#530,device#449,news_key#459,COUNT(news_key#459) AS
PartialCount#719L]
   Project [net_site#530,device#449,news_key#459]
Filter (CAST(et#451, DoubleType) = 3.0)
 InMemoryColumnarTableScan
[net_site#530,device#449,news_key#459,et#451], [(CAST(et#451,
DoubleType) = 3.0)], (InMemoryRelation
[net_site#530,device#449,cbd#448,et#451,news_key#459,underscore_et#478],
true, 1, StorageLevel(true, true, false, true, 1), (Project
[net_site#50,device#6,cbd#5,et#8,news_key#16,underscore_et#35]), None)

at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:171)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at 
org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5.apply(basicOperators.scala:190)
at 
org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5.apply(basicOperators.scala:190)
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 21 more


On 16 November 2015 at 21:16, Zhang, Jingyu <jingyu.zh...@news.com.au>
wrote:

> I am using spark-csv to save files in s3, it shown Size exceeds. Please let 
> me know how to fix it. Thanks.
>
> df.write()
> .format("com.databricks.spark.csv")
> .option("header", "true")
> .save("s3://newcars.csv");
>
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>   at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>   at 
> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
>   at 
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:70)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.conc

Re: How to passing parameters to another java class

2015-11-15 Thread Zhang, Jingyu
Thanks Fengdong,

the startTime, and endTime are null in the method of call(Iterator
lines). Java do not allow top-level class to be Static.

>From Spark docs, I can broadcast them but I don't know how to receive them
form another class.



On 16 November 2015 at 16:16, Fengdong Yu <fengdo...@everstring.com> wrote:

> If you got “cannot  Serialized” Exception, then you need to
>  PixelGenerator as a Static class.
>
>
>
>
> On Nov 16, 2015, at 1:10 PM, Zhang, Jingyu <jingyu.zh...@news.com.au>
> wrote:
>
> Thanks, that worked for local environment but not in the Spark Cluster.
>
>
> On 16 November 2015 at 16:05, Fengdong Yu <fengdo...@everstring.com>
> wrote:
>
>> Can you try : new PixelGenerator(startTime, endTime) ?
>>
>>
>>
>> On Nov 16, 2015, at 12:47 PM, Zhang, Jingyu <jingyu.zh...@news.com.au>
>> wrote:
>>
>> I want to pass two parameters into new java class from
>> rdd.mapPartitions(), the code  like following.
>>
>> ---Source Code
>>
>> Main method:
>>
>> /*the parameters that I want to pass into the PixelGenerator.class for
>> selecting any items between the startTime and the endTime.
>>
>> */
>>
>> int startTime, endTime;
>>
>> JavaRDD pixelsObj = pixelsStr.mapPartitions(new
>> PixelGenerator());
>>
>> PixelGenerator.java
>>
>> public class PixelGenerator implements FlatMapFunction<Iterator,
>> PixelObject> {
>>
>> public Iterable call(Iterator lines) {
>>
>> ...
>>
>> }
>>
>> Can anyone told me how to pass the startTime, endTime
>> into PixelGenerator class?
>>
>> Many Thanks
>>
>> This message and its attachments may contain legally privileged or
>> confidential information. It is intended solely for the named addressee. If
>> you are not the addressee indicated in this message or responsible for
>> delivery of the message to the addressee, you may not copy or deliver this
>> message or its attachments to anyone. Rather, you should permanently delete
>> this message and its attachments and kindly notify the sender by reply
>> e-mail. Any content of this message and its attachments which does not
>> relate to the official business of the sending company must be taken not to
>> have been sent or endorsed by that company or any of its related entities.
>> No warranty is made that the e-mail or attachments are free from computer
>> virus or other defect.
>>
>>
>>
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.
>
>
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


How to passing parameters to another java class

2015-11-15 Thread Zhang, Jingyu
I want to pass two parameters into new java class from rdd.mapPartitions(),
the code  like following.

---Source Code

Main method:

/*the parameters that I want to pass into the PixelGenerator.class for
selecting any items between the startTime and the endTime.

*/

int startTime, endTime;

JavaRDD pixelsObj = pixelsStr.mapPartitions(new
PixelGenerator());

PixelGenerator.java

public class PixelGenerator implements FlatMapFunction {

public Iterable call(Iterator lines) {

...

}

Can anyone told me how to pass the startTime, endTime
into PixelGenerator class?

Many Thanks

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: How to passing parameters to another java class

2015-11-15 Thread Zhang, Jingyu
Thanks, that worked for local environment but not in the Spark Cluster.


On 16 November 2015 at 16:05, Fengdong Yu <fengdo...@everstring.com> wrote:

> Can you try : new PixelGenerator(startTime, endTime) ?
>
>
>
> On Nov 16, 2015, at 12:47 PM, Zhang, Jingyu <jingyu.zh...@news.com.au>
> wrote:
>
> I want to pass two parameters into new java class from
> rdd.mapPartitions(), the code  like following.
>
> ---Source Code
>
> Main method:
>
> /*the parameters that I want to pass into the PixelGenerator.class for
> selecting any items between the startTime and the endTime.
>
> */
>
> int startTime, endTime;
>
> JavaRDD pixelsObj = pixelsStr.mapPartitions(new
> PixelGenerator());
>
> PixelGenerator.java
>
> public class PixelGenerator implements FlatMapFunction<Iterator,
> PixelObject> {
>
> public Iterable call(Iterator lines) {
>
> ...
>
> }
>
> Can anyone told me how to pass the startTime, endTime
> into PixelGenerator class?
>
> Many Thanks
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.
>
>
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Spark-csv error on read AWS s3a in spark 1.4.1

2015-11-10 Thread Zhang, Jingyu
A small csv file in S3. I use s3a://key:seckey@bucketname/a.csv

 It works for SparkContext

pixelsStr: SparkContext = ctx.textFile(s3pathOrg);

It works for Java Spark-csv as well

Java code : DataFrame careerOneDF = sqlContext.read().format(
"com.databricks.spark.csv")

.option("inferSchema", "true") .option("header", "true").load(s3pathOrg
);

However, it do not work for Scala, error message shown below

val careerOneDF:DataFrame = sqlContext.read

.format("com.databricks.spark.csv")

.option("inferSchema", "true")

.option("header", "true")

.load(s3pathOrg);

com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
F2E11C10E6D35BF3), S3 Extended Request ID:
0tdESZAHmROgSJem6P3gYnEZs86rrt4PByrTYbxzCw0xyM9KUMCHEAX3x4lcoy5O3A8qccgHraQ=

at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(
AmazonHttpClient.java:1160)

at com.amazonaws.http.AmazonHttpClient.executeOneRequest(
AmazonHttpClient.java:748)

at com.amazonaws.http.AmazonHttpClient.executeHelper(
AmazonHttpClient.java:467)

at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)

at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)

at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
AmazonS3Client.java:1050)

at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
AmazonS3Client.java:1027)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(
S3AFileSystem.java:688)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(
S3AFileSystem.java:71)

at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)

at org.apache.hadoop.fs.Globber.glob(Globber.java:252)

at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)

at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(
FileInputFormat.java:257)

at org.apache.hadoop.mapred.FileInputFormat.listStatus(
FileInputFormat.java:228)

at org.apache.hadoop.mapred.FileInputFormat.getSplits(
FileInputFormat.java:313)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
MapPartitionsRDD.scala:32)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1255)

at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:147)

at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)

at org.apache.spark.rdd.RDD.take(RDD.scala:1250)

at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1290)

at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:147)

at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)

at org.apache.spark.rdd.RDD.first(RDD.scala:1289)

at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(
CsvRelation.scala:129)

at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:127)

at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:109)

at com.databricks.spark.csv.CsvRelation.(CsvRelation.scala:62)

at com.databricks.spark.csv.DefaultSource.createRelation(
DefaultSource.scala:115)

at com.databricks.spark.csv.DefaultSource.createRelation(
DefaultSource.scala:40)

at com.databricks.spark.csv.DefaultSource.createRelation(
DefaultSource.scala:28)

at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:269)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)


Thanks

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


key not found: sportingpulse.com in Spark SQL 1.5.0

2015-10-30 Thread Zhang, Jingyu
There is not a problem in Spark SQL 1.5.1 but the error of "key not found:
sportingpulse.com" shown up when I use 1.5.0.

I have to use the version of 1.5.0 because that the one AWS EMR support.
Can anyone tell me why Spark uses "sportingpulse.com" and how to fix it?

Thanks.

Caused by: java.util.NoSuchElementException: key not found:
sportingpulse.com

at scala.collection.MapLike$class.default(MapLike.scala:228)

at scala.collection.AbstractMap.default(Map.scala:58)

at scala.collection.mutable.HashMap.apply(HashMap.scala:64)

at
org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(
compressionSchemes.scala:258)

at
org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(
CompressibleColumnBuilder.scala:110)

at org.apache.spark.sql.columnar.NativeColumnBuilder.build(
ColumnBuilder.scala:87)

at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(
InMemoryColumnarTableScan.scala:152)

at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(
InMemoryColumnarTableScan.scala:152)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.IndexedSeqOptimized$class.foreach(
IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
InMemoryColumnarTableScan.scala:152)

at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
InMemoryColumnarTableScan.scala:120)

at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)

at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(
MapPartitionsWithPreparationRDD.scala:63)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73
)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41
)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: key not found: sportingpulse.com in Spark SQL 1.5.0

2015-10-30 Thread Zhang, Jingyu
Thanks Silvio and Ted,

Can you please let me know how to fix this intermittent issues? Should I
wait EMR upgrading to support the Spark 1.5.1 or change my code from
DataFrame to normal Spark map-reduce?

Regards,

Jingyu

On 31 October 2015 at 09:40, Silvio Fiorito <silvio.fior...@granturing.com>
wrote:

> It's something due to the columnar compression. I've seen similar
> intermittent issues when caching DataFrames. "sportingpulse.com" is a
> value in one of the columns of the DF.
> --
> From: Ted Yu <yuzhih...@gmail.com>
> Sent: ‎10/‎30/‎2015 6:33 PM
> To: Zhang, Jingyu <jingyu.zh...@news.com.au>
> Cc: user <user@spark.apache.org>
> Subject: Re: key not found: sportingpulse.com in Spark SQL 1.5.0
>
> I searched for sportingpulse in *.scala and *.java files under 1.5 branch.
> There was no hit.
>
> mvn dependency doesn't show sportingpulse either.
>
> Is it possible this is specific to EMR ?
>
> Cheers
>
> On Fri, Oct 30, 2015 at 2:57 PM, Zhang, Jingyu <jingyu.zh...@news.com.au>
> wrote:
>
>> There is not a problem in Spark SQL 1.5.1 but the error of "key not
>> found: sportingpulse.com" shown up when I use 1.5.0.
>>
>> I have to use the version of 1.5.0 because that the one AWS EMR support.
>> Can anyone tell me why Spark uses "sportingpulse.com" and how to fix it?
>>
>> Thanks.
>>
>> Caused by: java.util.NoSuchElementException: key not found:
>> sportingpulse.com
>>
>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>>
>> at scala.collection.AbstractMap.default(Map.scala:58)
>>
>> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>
>> at
>> org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(
>> compressionSchemes.scala:258)
>>
>> at
>> org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(
>> CompressibleColumnBuilder.scala:110)
>>
>> at org.apache.spark.sql.columnar.NativeColumnBuilder.build(
>> ColumnBuilder.scala:87)
>>
>> at
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(
>> InMemoryColumnarTableScan.scala:152)
>>
>> at
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(
>> InMemoryColumnarTableScan.scala:152)
>>
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:244)
>>
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:244)
>>
>> at scala.collection.IndexedSeqOptimized$class.foreach(
>> IndexedSeqOptimized.scala:33)
>>
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>
>> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
>> InMemoryColumnarTableScan.scala:152)
>>
>> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
>> InMemoryColumnarTableScan.scala:120)
>>
>> at org.apache.spark.storage.MemoryStore.unrollSafely(
>> MemoryStore.scala:278)
>>
>> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171
>> )
>>
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>
>> at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(
>> MapPartitionsWithPreparationRDD.scala:63)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.sca

Re: Save data to different S3

2015-10-29 Thread Zhang, Jingyu
Try s3://aws_key:aws_secret@bucketName/folderName with your access key and
secret to save the data.

On 30 October 2015 at 10:55, William Li  wrote:

> Hi – I have a simple app running fine with Spark, it reads data from S3
> and performs calculation.
>
> When reading data from S3, I use hadoopConfiguration.set for both
> fs.s3n.awsAccessKeyId, and the fs.s3n.awsSecretAccessKey to it has
> permissions to load the data from customer sources.
>
> However, after I complete the analysis, how do I save the results (it’s a
> org.apache.spark.rdd.RDD[String]) into my own s3 bucket which requires
> different access key and secret? It seems one option is that I could save
> the results as local file to the spark cluster, then create a new
> SQLContext with the different access, then load the data from the local
> file.
>
> Is there any other options without requiring save and re-load files?
>
>
> Thanks,
>
> William.
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-29 Thread Zhang, Jingyu
Thanks Romi,

I resize the dataset to 7MB, however, the code show NullPointerException
 exception as well.

Did you try to cache a DataFrame with just a single row?

Yes, I tried. But, Same problem.
.
Do you rows have any columns with null values?

No, I had filter out null values before cache the dataframe.

Can you post a code snippet here on how you load/generate the dataframe?

Sure, Here is the working code 1:

JavaRDD pixels = pixelsStr.map(new PixelGenerator()).cache();

System.out.println(pixels.count()); // 3000-4000 rows

Working code 2:

JavaRDD pixels = pixelsStr.map(new PixelGenerator());

DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.class
);

DataFrame totalDF1 =
schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
is not null").limit(500);

System.out.println(totalDF1.count());


BUT, after change limit(500) to limit(1000). The code report
NullPointerException.


JavaRDD pixels = pixelsStr.map(new PixelGenerator());

DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.class
);

DataFrame totalDF =
schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
is not null").limit(*1000*);

System.out.println(totalDF.count()); // problem at this line

15/10/29 18:56:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool

15/10/29 18:56:28 INFO TaskSchedulerImpl: Cancelling stage 0

15/10/29 18:56:28 INFO DAGScheduler: ShuffleMapStage 0 (count at
X.java:113) failed in 3.764 s

15/10/29 18:56:28 INFO DAGScheduler: Job 0 failed: count at XXX.java:113,
took 3.862207 s

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost): java.lang.NullPointerException

at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
Does dataframe.rdd.cache work?

No, I tried but same exception.

Thanks,

Jingyu

On 29 October 2015 at 17:38, Romi Kuntsman <r...@totango.com> wrote:

> Did you try to cache a DataFrame with just a single row?
> Do you rows have any columns with null values?
> Can you post a code snippet here on how you load/generate the dataframe?
> Does dataframe.rdd.cache work?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu <jingyu.zh...@news.com.au>
> wrote:
>
>> It is not a problem to use JavaRDD.cache() for 200M data (all Objects
>> read form Json Format). But when I try to use DataFrame.cache(), It shown
>> exception in below.
>>
>> My machine can cache 1 G data in Avro format without any problem.
>>
>> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms
>>
>> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
>> 27.832369 ms
>>
>> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
>> 1)
>>
>> java.lang.NullPointerException
>>
>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:497)
>>
>> at
>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>> SQLContext.scala:500)
>>
>> at
>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>> SQLContext.scala:500)
>>
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:244)
>>
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:244)
>>
>> at scala.collection.IndexedSeqOptimized$class.foreach(
>> IndexedSeqOptimized.scala:33)
>>
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>
>> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
>> SQLContext.scala:500)
>>
>> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
>> SQLContext.scala:498)
>>
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>
>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
>>
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>
>> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
>> InMemoryColumnarTableScan.scala:127)
>>
>> at org.apache.spark.sql.columnar.InMemoryRelat

NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-28 Thread Zhang, Jingyu
It is not a problem to use JavaRDD.cache() for 200M data (all Objects read
form Json Format). But when I try to use DataFrame.cache(), It shown
exception in below.

My machine can cache 1 G data in Avro format without any problem.

15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms

15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
27.832369 ms

15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)

java.lang.NullPointerException

at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
SQLContext.scala:500)

at
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
SQLContext.scala:500)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.IndexedSeqOptimized$class.foreach(
IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
SQLContext.scala:500)

at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
SQLContext.scala:498)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
InMemoryColumnarTableScan.scala:127)

at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
InMemoryColumnarTableScan.scala:120)

at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)

at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

15/10/29 13:26:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
localhost): java.lang.NullPointerException

at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)


Thanks,


Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Exception on save s3n file (1.4.1, hadoop 2.6)

2015-09-24 Thread Zhang, Jingyu
I got following exception when I run
JavPairRDD.values().saveAsTextFile("s3n://bucket);
Can anyone help me out? thanks


15/09/25 12:24:32 INFO SparkContext: Successfully stopped SparkContext

Exception in thread "main" java.lang.NoClassDefFoundError:
org/jets3t/service/ServiceException

at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(
NativeS3FileSystem.java:338)

at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(
NativeS3FileSystem.java:328)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

at
org.apache.spark.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:170)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:988)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)

at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)

at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896)

at
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1404)

at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1383)

at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1383)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)

at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1383)

at
org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:519)

at
org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:47)

at com.news.report.section.SectionSubSection.run(SectionSubSection.java:184)

at com.news.report.section.SectionSubSection.main(SectionSubSection.java:67)

Caused by: java.lang.ClassNotFoundException:
org.jets3t.service.ServiceException

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 34 more

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


How to get RDD from PairRDD<key,value> in Java

2015-09-23 Thread Zhang, Jingyu
Hi All,

I want to extract the "value" RDD from PairRDD in Java

Please let me know how can  I get it easily.

Thanks

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


caching DataFrames

2015-09-23 Thread Zhang, Jingyu
I have A and B DataFrames
A has columns a11,a12, a21,a22
B has columns b11,b12, b21,b22

I persistent them in cache
1. A.Cache(),
2.  B.Cache()

Then, I persistent the subset in cache later

3. DataFrame A1 (a11,a12).cache()

4. DataFrame B1 (b11,b12).cache()

5. DataFrame AB1 (a11,a12,b11,b12).cahce()

Can you please tell me what happen for caching case (3,4, and 5) after A
and B cached?
How much  more memory do I need compare with Caching 1 and 2 only?

Thanks

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: Java Heap Space Error

2015-09-23 Thread Zhang, Jingyu
Is you sql works if do not runs a regex on strings and concatenates them, I
mean just Select the stuff without String operations?

On 24 September 2015 at 10:11, java8964  wrote:

> Try to increase partitions count, that will make each partition has less
> data.
>
> Yong
>
> --
> Subject: Re: Java Heap Space Error
> From: yu...@useinsider.com
> Date: Thu, 24 Sep 2015 00:32:47 +0300
> CC: user@spark.apache.org
> To: java8...@hotmail.com
>
>
> Yes, it’s possible. I use S3 as data source. My external tables has
> partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of
> 200 in 2.stage because of sql.shuffle.partitions.
>
> How can i avoid this situation, this is my query:
>
> select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not 
> NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
>  ') inputlist from landing where dt='2015-9' and userid != '' and userid 
> is not null and userid is not NULL and pagetype = 'productDetail' group by 
> userid
>
>
> On 23 Sep 2015, at 23:55, java8964  wrote:
>
> Based on your description, you job shouldn't have any shuffle then, as you
> just apply regex and concatenation on the column, but there is one
> partition having 4.3M records to be read, vs less than 1M records for other
> partitions.
>
> Is that possible? It depends on what is the source of your data.
>
> If there is shuffle in your query (More than 2 stages generated by your
> query, and this is my guess of what happening), then it simple means that
> one partition having way more data than the rest of partitions.
>
> Yong
>
> --
> From: yu...@useinsider.com
> Subject: Java Heap Space Error
> Date: Wed, 23 Sep 2015 23:07:17 +0300
> To: user@spark.apache.org
>
> What can cause this issue in the attached picture? I’m running and sql
> query which runs a regex on strings and concatenates them. Because of this
> task, my job gives java heap space error.
>
> 
>
>
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: caching DataFrames

2015-09-23 Thread Zhang, Jingyu
Thanks Hemant,

I will generate a total report (dfA) with many columns from log data. After
the report (A) done. I will generate many detail reports (dfA1-dfAi) base
on the subset of the total report (dfA), those detail reports using
aggregate and  window functions, according on different rules. However,
some information will lost after aggregate or window functions.

In the end, few of the detail reports can be generate directly from subset
df, But, many of reports should get some information back from the total
report.  Thus, I consider if there are any performance benefit if I cache
both dfA and its subset. If so, how many memory that I should prepare for
them.



On 24 September 2015 at 14:56, Hemant Bhanawat <hemant9...@gmail.com> wrote:

> hit send button too early...
>
> However, why would you want to cache a dataFrame that is subset of already
> cached dataFrame.
>
> If dfA is cached, and dfA1 is created by applying some transformation on
> dfA, actions on dfA1 will use cache of dfA.
>
>
> val dfA1 = dfA.filter($"_1" > 50)
>
> // this will run on the cached data of A.
>
> dfA1.count()
>
>
>
> On Thu, Sep 24, 2015 at 10:20 AM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Two dataframes do not share cache storage in Spark. Hence it's immaterial
>> that how two dataFrames are related to each other. Both of them are going
>> to consume memory based on the data that they have.  So for your A1 and B1
>> you would need extra memory that would be equivalent to half the memory of
>> A/B.
>>
>> You can check the storage that a dataFrame is consuming in the Spark UI's
>> Storage tab. http://host:4040/storage/
>>
>>
>>
>> On Thu, Sep 24, 2015 at 5:37 AM, Zhang, Jingyu <jingyu.zh...@news.com.au>
>> wrote:
>>
>>> I have A and B DataFrames
>>> A has columns a11,a12, a21,a22
>>> B has columns b11,b12, b21,b22
>>>
>>> I persistent them in cache
>>> 1. A.Cache(),
>>> 2.  B.Cache()
>>>
>>> Then, I persistent the subset in cache later
>>>
>>> 3. DataFrame A1 (a11,a12).cache()
>>>
>>> 4. DataFrame B1 (b11,b12).cache()
>>>
>>> 5. DataFrame AB1 (a11,a12,b11,b12).cahce()
>>>
>>> Can you please tell me what happen for caching case (3,4, and 5) after A
>>> and B cached?
>>> How much  more memory do I need compare with Caching 1 and 2 only?
>>>
>>> Thanks
>>>
>>> Jingyu
>>>
>>> This message and its attachments may contain legally privileged or
>>> confidential information. It is intended solely for the named addressee. If
>>> you are not the addressee indicated in this message or responsible for
>>> delivery of the message to the addressee, you may not copy or deliver this
>>> message or its attachments to anyone. Rather, you should permanently delete
>>> this message and its attachments and kindly notify the sender by reply
>>> e-mail. Any content of this message and its attachments which does not
>>> relate to the official business of the sending company must be taken not to
>>> have been sent or endorsed by that company or any of its related entities.
>>> No warranty is made that the e-mail or attachments are free from computer
>>> virus or other defect.
>>
>>
>>
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: word count (group by users) in spark

2015-09-21 Thread Zhang, Jingyu
Spark spills data to disk when there is more data shuffled onto a single
executor machine than can fit in memory. However, it flushes out the data
to disk one key at a time - so if a single key has more key-value pairs
than can fit in memory, an out of memory exception occurs.

Cheers,

Jingyu

On 21 September 2015 at 16:39, Aniket Bhatnagar 
wrote:

> Unless I am mistaken, in a group by operation, it spills to disk in case
> values for a key don't fit in memory.
>
> Thanks,
> Aniket
>
> On Mon, Sep 21, 2015 at 10:43 AM Huy Banh  wrote:
>
>> Hi,
>>
>> If your input format is user -> comment, then you could:
>>
>> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
>> four three")))
>> val wordCounts = comments.
>>flatMap({case (user, comment) =>
>> for (word <- comment.split(" ")) yield(((user, word), 1)) }).
>>reduceByKey(_ + _)
>>
>> val output = wordCounts.
>>map({case ((user, word), count) => (user, (word, count))}).
>>groupByKey()
>>
>> By Aniket, if we group by user first, it could run out of memory when
>> spark tries to put all words in a single sequence, couldn't it?
>>
>> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Using scala API, you can first group by user and then use combineByKey.
>>>
>>> Thanks,
>>> Aniket
>>>
>>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com <
>>> kali.tumm...@gmail.com> wrote:
>>>
 Hi All,
 I would like to achieve this below output using spark , I managed to
 write
 in Hive and call it in spark but not in just spark (scala), how to group
 word counts on particular user (column) for example.
 Imagine users and their given tweets I want to do word count based on
 user
 name.

 Input:-
 kaliA,B,A,B,B
 james B,A,A,A,B

 Output:-
 kali A [Count] B [Count]
 James A [Count] B [Count]

 My Hive Answer:-
 CREATE EXTERNAL TABLE  TEST
 (
  user_name string   ,
  COMMENTS  STRING

 )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
 LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
 create a text file with data mentioned in the email)

 use default;select user_name,COLLECT_SET(text) from (select
 user_name,concat(sub,' ',count(comments)) as text  from test LATERAL
 VIEW
 explode(split(comments,',')) subView AS sub group by user_name,sub)w
 group
 by user_name;

 Spark With Hive:-
 package com.examples

 /**
  * Created by kalit_000 on 17/09/2015.
  */
 import org.apache.log4j.Logger
 import org.apache.log4j.Level
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._


 object HiveWordCount {

   def main(args: Array[String]): Unit =
   {
 Logger.getLogger("org").setLevel(Level.WARN)
 Logger.getLogger("akka").setLevel(Level.WARN)

 val conf = new

 SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
 "1g")
 val sc = new SparkContext(conf)
 val sqlContext= new SQLContext(sc)

 val hc=new HiveContext(sc)

 hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
 string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
 STORED AS TEXTFILE LOCATION '/data/kali/test' ")

 val op=hc.sql("select user_name,COLLECT_SET(text) from (select
 user_name,concat(sub,' ',count(comments)) as text  from default.test
 LATERAL
 VIEW explode(split(comments,',')) subView AS sub group by
 user_name,sub)w
 group by user_name")

 op.collect.foreach(println)


   }




 Thanks




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this