Getting NPE when trying to do spark streaming with Twitter

2016-04-10 Thread krisgari
 I am new to SparkStreaming, when tried to submit the Spark-Twitter streaming
job, getting the following error:
---
Lost task 0.0 in stage 0.0 (TID
0,sandbox.hortonworks.com):java.lang.NullPointerException
at org.apache.spark.util.Utils$.decodeFileNameInURI(Utils.scala:340)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:365)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:404)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:396)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:396)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745
--

Here is the code snippet:
--
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) =
args.take(4)
val filters = args.takeRight(args.length - 4)

System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf,  Seconds(2))
val stream = TwitterUtils.createStream(ssc,None, filters)
val hashTags = stream.flatMap(status => status.getText.split("
").filter(_.startsWith("#")))
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
Seconds(60))
 .map{case (topic, count) => (count, topic)}
 .transform(_.sortByKey(false))
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 60 seconds (%s
total):".format(rdd.count()))
  topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag,
count))}
})
ssc.start()
ssc.awaitTermination()

--
command to submit the job
--
./bin/spark-submit --class
"org.apache.spark.examples.streaming.TwitterPopularTags"  --master
yarn-client --num-executors 3 --driver-memory 512m --executor-memory 512m
--executor-cores 1  --jars
/home/spark/.sbt/0.13/staging/0ff3ad537358b61f617c/twitterstream/target/scala-2.10/twitterstream-project_2.10-1.0.jar,/home/spark/.ivy2/cache/org.apache.spark/spark-streaming-twitter_2.10/jars/spark-streaming-twitter_2.10-1.6.1.jar,/home/spark/.ivy2/cache/org.twitter4j/twitter4j-core/jars/twitter4j-core-4.0.4.jar,/home/spark/.ivy2/cache/org.twitter4j/twitter4j-stream/jars/twitter4j-stream-4.0.4.jar,/home/spark/.ivy2/cache/org.apache.spark/spark-streaming_2.10/jars/spark-streaming_2.10-1.6.1.jar,/home/spark/.ivy2/cache/org.apache.spark/spark-core_2.10/jars/spark-core_2.10-1.6.1.jar
"sandbox.hortonworks.com:6667" xx xx xx xx 
--

Any clue why I am getting this NPE?? Any help on how to debug this further?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-NPE-when-trying-to-do-spark-streaming-with-Twitter-tp26737.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



High GC time when setting custom input partitions

2016-04-10 Thread Johnny W.
Hi spark-user,

I am using spark 1.6 to build reverse index for one month of twitter data
(~50GB). The split size of HDFS is 1GB, thus by default sc.textFile creates
50 partitions. I'd like to increase the parallelism by increase the number
of input partitions. Thus, I use textFile(..., 200) to yield 200 partitions.

I found a significant GC overhead for the stage of building reverse indexes
(with a large shuffle). More than 80% of task time is consumed by GC. I
tried to decrease the # of cores per executor from 8 to 5, and the GC time
was reduced but still high (more than 50% of task time). However, with the
default number of partitions (50), there is no GC overhead at all.

The machines running executors have more than 100GB memory, and I set
executor memory to 32GB. I can confirm that no more than 1 executor running
on each machine.

I am wondering why there is such significant GC overhead after I increase
the number of input partitions?

Thanks,
J.


Re: Graphframes pattern causing java heap space errors

2016-04-10 Thread Buntu Dev
Thanks Ted for the input. I was able to get it working with pyspark shell
but the same job submitted via 'spark-submit' using client or cluster
deploy mode ends up with these errors:

~
java.lang.OutOfMemoryError: Java heap space
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
ERROR Utils: Uncaught exception in thread task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:275)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


When using pyspark, I'm passing the options via cmd line:

pyspark --master yarn --deploy-mode client --driver-memory 6g
--executor-memory 8g --executor-cores 4

while I'm doing the same via conf.set() in the script that I'm using with
spark-submit.

I do notice the options being picked up correctly under the Environment tab
but its not clear as to why pyspark is successful while spark-submit fails.
Is there any difference in how these two ways to run the job?

Thanks!


On Sun, Apr 10, 2016 at 4:28 AM, Ted Yu  wrote:

> Looks like the exception occurred on driver.
>
> Consider increasing the values for the following config:
>
> conf.set("spark.driver.memory", "10240m")
> conf.set("spark.driver.maxResultSize", "2g")
>
> Cheers
>
> On Sat, Apr 9, 2016 at 9:02 PM, Buntu Dev  wrote:
>
>> I'm running it via pyspark against yarn in client deploy mode. I do
>> notice in the spark web ui under Environment tab all the options I've set,
>> so I'm guessing these are accepted.
>>
>> On Sat, Apr 9, 2016 at 5:52 PM, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> (I haven't played with GraphFrames)
>>>
>>> What's your `sc.master`? How do you run your application --
>>> spark-submit or java -jar or sbt run or...? The reason I'm asking is
>>> that few options might not be in use whatsoever, e.g.

Connection closed Exception.

2016-04-10 Thread Bijay Kumar Pathak
Hi,

I am running Spark 1.6 on EMR. I have workflow which does the following
things:

   1. Read the 2 flat file, create the data frame and join it.
   2. Read the particular partition from the hive table and joins the
   dataframe from 1 with it.
   3. Finally, insert overwrite into hive table which is partitioned into
   two fields.

The stout log message in terminal when I submit the jobs show the below
message.
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 30149"...
Killed

And while I check the YARN logs it shows the below error. The Spark UI
doesn't show any failure stages or tasks but the jobs get stuck in the
middle without completing all the stages. Did anyone come across similar
issues? What could be the reason behind it and how could I troubleshoot it?


16/04/11 00:19:38 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 is closed
16/04/11 00:19:38 WARN executor.CoarseGrainedExecutorBackend: An unknown
(ip-10-184-195-29.ec2.internal:43162) driver disconnected.
16/04/11 00:19:38 ERROR executor.CoarseGrainedExecutorBackend: Driver
10.184.195.29:43162 disassociated! Shutting down.
16/04/11 00:19:38 WARN netty.NettyRpcEndpointRef: Error sending message
[message = Heartbeat(12,[Lscala.Tuple2;@6545df9a,BlockManagerId(12,
ip-10-184-194-43.ec2.internal, 43867))] in 1 attempts
java.io.IOException: Connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/04/11 00:19:38 INFO storage.DiskBlockManager: Shutdown hook called
16/04/11 00:19:38 INFO util.ShutdownHookManager: Shutdown hook called


Re: Datasets combineByKey

2016-04-10 Thread Koert Kuipers
yes it is
On Apr 10, 2016 3:17 PM, "Amit Sela"  wrote:

> I think *org.apache.spark.sql.expressions.Aggregator* is what I'm looking
> for, makes sense ?
>
> On Sun, Apr 10, 2016 at 4:08 PM Amit Sela  wrote:
>
>> I'm mapping RDD API to Datasets API and I was wondering if I was missing
>> something or is this functionality is missing.
>>
>>
>> On Sun, Apr 10, 2016 at 3:00 PM Ted Yu  wrote:
>>
>>> Haven't found any JIRA w.r.t. combineByKey for Dataset.
>>>
>>> What's your use case ?
>>>
>>> Thanks
>>>
>>> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela  wrote:
>>>
 Is there (planned ?) a combineByKey support for Dataset ?
 Is / Will there be a support for combiner lifting ?

 Thanks,
 Amit

>>>
>>>


Fwd: Connection closed Exception.

2016-04-10 Thread Bijay Pathak
Hi,

I am running Spark 1.6 on EMR. I have workflow which does the fiollowing
things:

   1. Read the 2 flat file, create the data frame and join it.
   2. Read the particular partition from the hive table and joins the
   dataframe from 1 with it.
   3. Finally, insert overwrite into hive table which is partitioned into
   two fields.

The stout log message in terminal when I submit the jobs show the below
message.
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 30149"...
Killed

And while I check the YARN logs it shows the below error. The Spark UI
doesn't show any failure stages or tasks but the jobs get stuck in the
middle without completing all the stages. Did anyone come across similar
issues? What could be the reason behind it and how could I troubleshoot it?


16/04/11 00:19:38 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 is closed
16/04/11 00:19:38 WARN executor.CoarseGrainedExecutorBackend: An unknown
(ip-10-184-195-29.ec2.internal:43162) driver disconnected.
16/04/11 00:19:38 ERROR executor.CoarseGrainedExecutorBackend: Driver
10.184.195.29:43162 disassociated! Shutting down.
16/04/11 00:19:38 WARN netty.NettyRpcEndpointRef: Error sending message
[message = Heartbeat(12,[Lscala.Tuple2;@6545df9a,BlockManagerId(12,
ip-10-184-194-43.ec2.internal, 43867))] in 1 attempts
java.io.IOException: Connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/04/11 00:19:38 INFO storage.DiskBlockManager: Shutdown hook called
16/04/11 00:19:38 INFO util.ShutdownHookManager: Shutdown hook called


Connection closed Exception.

2016-04-10 Thread Bijay Pathak
Hi,

I am running Spark 1.6 on EMR. I have workflow which does the fiollowing
things:

   1. Read the 2 flat file, create the data frame and join it.
   2. Read the particular partition from the hive table and joins the
   dataframe from 1 with it.
   3. Finally, insert overwrite into hive table which is partitioned into
   two fields.

The stout log message in terminal when I submit the jobs show the below
message.
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 30149"...
Killed

And while I check the YARN logs it shows the below error. The Spark UI
doesn't show any failure stages or tasks but the jobs get stuck in the
middle without completing all the stages. Did anyone come across similar
issues? What could be the reason behind it and how could I troubleshoot it?


16/04/11 00:19:38 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 is closed
16/04/11 00:19:38 WARN executor.CoarseGrainedExecutorBackend: An unknown
(ip-10-184-195-29.ec2.internal:43162) driver disconnected.
16/04/11 00:19:38 ERROR executor.CoarseGrainedExecutorBackend: Driver
10.184.195.29:43162 disassociated! Shutting down.
16/04/11 00:19:38 WARN netty.NettyRpcEndpointRef: Error sending message
[message = Heartbeat(12,[Lscala.Tuple2;@6545df9a,BlockManagerId(12,
ip-10-184-194-43.ec2.internal, 43867))] in 1 attempts
java.io.IOException: Connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/04/11 00:19:38 INFO storage.DiskBlockManager: Shutdown hook called
16/04/11 00:19:38 INFO util.ShutdownHookManager: Shutdown hook called


Re: alter table add columns aternatives or hive refresh

2016-04-10 Thread Maurin Lenglart
Your solution works in hive, but not in spark, even if I use hive context.
I tried to create a temp table and then this query:
 - sqlContext.sql("insert into table myTable select * from myTable_temp”)
But I still get the same error.

thanks

From: Mich Talebzadeh 
>
Date: Sunday, April 10, 2016 at 12:25 PM
To: "user @spark" >
Subject: Re: alter table add columns aternatives or hive refresh

Hi,

I am confining myself to Hive tables. As I stated it before I have not tried it 
in Spark. So I stand corrected.

Let us try this simple test in Hive


-- Create table
hive> create table testme(col1 int);
OK
--insert a row
hive> insert into testme values(1);

Loading data to table test.testme
OK
-- Add a new column to testme
hive> alter table testme add columns (new_col varchar(30));
OK
Time taken: 0.055 seconds

-- Expect one row here

hive> select * from testme;
OK
1   NULL
-- Add a new row including values for new_col. This should work
hive> insert into testme values(1,'London');
Loading data to table test.testme
OK
hive> select * from testme;
OK
1   NULL
1   London
Time taken: 0.074 seconds, Fetched: 2 row(s)
-- Now update the new column
hive> update testme set col2 = 'NY';
FAILED: SemanticException [Error 10297]: Attempt to do update or delete on 
table test.testme that does not use an AcidOutputFormat or is not bucketed

So this is Hive. You can add new rows including values for the new column but 
cannot update the null values. Will this work for you?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 10 April 2016 at 19:34, Maurin Lenglart 
> wrote:
Hi,
So basically you are telling me that I need to recreate a table, and re-insert 
everything every time  I update a column?
I understand the constraints, but that solution doesn’t look good to me. I am 
updating the schema everyday and the table is a couple of TB of data.

Do you see any other options that will allow me not to move TB of data everyday?

Thanks for you answer

From: Mich Talebzadeh 
>
Date: Sunday, April 10, 2016 at 3:41 AM
To: maurin lenglart >
Cc: "user@spark.apache.org" 
>
Subject: Re: alter table add columns aternatives or hive refresh

I have not tried it on Spark but the column added in Hive to an existing table 
cannot be updated for existing rows. In other words the new column is set to 
null which does not require the change in the existing file length.

So basically as I understand when a  column is added to an already table.

1.The metadata for the underlying table will be updated
2.The new column will by default have null value
3.The existing rows cannot have new column updated to a non null value
4.New rows can have non null values set for the new column
5.No sql operation can be done on that column. For example select * from 
 where new_column IS NOT NULL
6.The easiest option is to create a new table with the new column and do 
insert/select from the existing table with values set for the new column

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 10 April 2016 at 05:06, Maurin Lenglart 
> wrote:
Hi,
I am trying to add columns to table that I created with the “saveAsTable” api.
I update the columns using sqlContext.sql(‘alter table myTable add columns 
(mycol string)’).
The next time I create a df and save it in the same table, with the new columns 
I get a :
“ParquetRelation
 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
statement generates the same number of columns as its schema.”

Also thise two commands don t return the same columns :
1. sqlContext.table(‘myTable’).schema.fields<— wrong result
2. sqlContext.sql(’show columns in mytable’)  <—— good results

It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 
(see related bugs)

But I am wondering, how else can I update the columns or make sure that spark 
take the new columns?

I already tried to refreshTable and to restart spark.

thanks





Multiple folders to SqlContext

2016-04-10 Thread KhajaAsmath Mohammed
Hi,

I am looking on how to add multiple folders to spark context and then make
it as a dataframe.

Lets say I have below folder

/daas/marts/US/file1.txt
/daas/marts/CH/file2.txt
/daas/marts/SG/file3.txt.

Above files have same schema. I dont want to create multiple dataframes
instead create only one dataframe by taking above folders/files as single
input to SqlContext.  Does anyone have solution for this?

Thanks,
Asmath.


Re: alter table add columns aternatives or hive refresh

2016-04-10 Thread Mich Talebzadeh
Hi,

I am confining myself to Hive tables. As I stated it before I have not
tried it in Spark. So I stand corrected.

Let us try this simple test in Hive


-- Create table
hive>
*create table testme(col1 int);*OK
--insert a row
hive> *insert into testme values(1);*

Loading data to table test.testme
OK
-- Add a new column to testme
hive>
*alter table testme add columns (new_col varchar(30));*OK
Time taken: 0.055 seconds

-- Expect one row here
hive>
*select * from testme;*OK
1   NULL
-- 
*Add a new row including values for new_col. This should work*hive>
*insert into testme values(1,'London');*Loading data to table test.testme
OK
hive>
*select * from testme;*OK
1   NULL
1   London
Time taken: 0.074 seconds, Fetched: 2 row(s)
-- Now update the new column
hive> update testme set col2 = 'NY';
FAILED: SemanticException [Error 10297]: Attempt to do update or delete on
table test.testme that does not use an AcidOutputFormat or is not bucketed

So this is Hive. You can add new rows including values for the new
column but cannot update the null values. Will this work for you?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 10 April 2016 at 19:34, Maurin Lenglart  wrote:

> Hi,
> So basically you are telling me that I need to recreate a table, and
> re-insert everything every time  I update a column?
> I understand the constraints, but that solution doesn’t look good to me. I
> am updating the schema everyday and the table is a couple of TB of data.
>
> Do you see any other options that will allow me not to move TB of data
> everyday?
>
> Thanks for you answer
>
> From: Mich Talebzadeh 
> Date: Sunday, April 10, 2016 at 3:41 AM
> To: maurin lenglart 
> Cc: "user@spark.apache.org" 
> Subject: Re: alter table add columns aternatives or hive refresh
>
> I have not tried it on Spark but the column added in Hive to an existing
> table cannot be updated for existing rows. In other words the new column is
> set to null which does not require the change in the existing file length.
>
> So basically as I understand when a  column is added to an already table.
>
> 1.The metadata for the underlying table will be updated
> 2.The new column will by default have null value
> 3.The existing rows cannot have new column updated to a non null value
> 4.New rows can have non null values set for the new column
> 5.No sql operation can be done on that column. For example select *
> from  where new_column IS NOT NULL
> 6.The easiest option is to create a new table with the new column and
> do insert/select from the existing table with values set for the new column
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 10 April 2016 at 05:06, Maurin Lenglart  wrote:
>
>> Hi,
>> I am trying to add columns to table that I created with the “saveAsTable”
>> api.
>> I update the columns using sqlContext.sql(‘alter table myTable add
>> columns (mycol string)’).
>> The next time I create a df and save it in the same table, with the new
>> columns I get a :
>> “ParquetRelation
>>  requires that the query in the SELECT clause of the INSERT
>> INTO/OVERWRITE statement generates the same number of columns as its
>> schema.”
>>
>> Also thise two commands don t return the same columns :
>> 1. sqlContext.table(‘myTable’).schema.fields<— wrong result
>> 2. sqlContext.sql(’show columns in mytable’)  <—— good results
>>
>> It seems to be a known bug :
>> https://issues.apache.org/jira/browse/SPARK-9764 (see related bugs)
>>
>> But I am wondering, how else can I update the columns or make sure that
>> spark take the new columns?
>>
>> I already tried to refreshTable and to restart spark.
>>
>> thanks
>>
>>
>


Re: Datasets combineByKey

2016-04-10 Thread Amit Sela
I think *org.apache.spark.sql.expressions.Aggregator* is what I'm looking
for, makes sense ?

On Sun, Apr 10, 2016 at 4:08 PM Amit Sela  wrote:

> I'm mapping RDD API to Datasets API and I was wondering if I was missing
> something or is this functionality is missing.
>
>
> On Sun, Apr 10, 2016 at 3:00 PM Ted Yu  wrote:
>
>> Haven't found any JIRA w.r.t. combineByKey for Dataset.
>>
>> What's your use case ?
>>
>> Thanks
>>
>> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela  wrote:
>>
>>> Is there (planned ?) a combineByKey support for Dataset ?
>>> Is / Will there be a support for combiner lifting ?
>>>
>>> Thanks,
>>> Amit
>>>
>>
>>


Re: alter table add columns aternatives or hive refresh

2016-04-10 Thread Maurin Lenglart
Hi,
So basically you are telling me that I need to recreate a table, and re-insert 
everything every time  I update a column?
I understand the constraints, but that solution doesn’t look good to me. I am 
updating the schema everyday and the table is a couple of TB of data.

Do you see any other options that will allow me not to move TB of data everyday?

Thanks for you answer

From: Mich Talebzadeh 
>
Date: Sunday, April 10, 2016 at 3:41 AM
To: maurin lenglart >
Cc: "user@spark.apache.org" 
>
Subject: Re: alter table add columns aternatives or hive refresh

I have not tried it on Spark but the column added in Hive to an existing table 
cannot be updated for existing rows. In other words the new column is set to 
null which does not require the change in the existing file length.

So basically as I understand when a  column is added to an already table.

1.The metadata for the underlying table will be updated
2.The new column will by default have null value
3.The existing rows cannot have new column updated to a non null value
4.New rows can have non null values set for the new column
5.No sql operation can be done on that column. For example select * from 
 where new_column IS NOT NULL
6.The easiest option is to create a new table with the new column and do 
insert/select from the existing table with values set for the new column

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 10 April 2016 at 05:06, Maurin Lenglart 
> wrote:
Hi,
I am trying to add columns to table that I created with the “saveAsTable” api.
I update the columns using sqlContext.sql(‘alter table myTable add columns 
(mycol string)’).
The next time I create a df and save it in the same table, with the new columns 
I get a :
“ParquetRelation
 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
statement generates the same number of columns as its schema.”

Also thise two commands don t return the same columns :
1. sqlContext.table(‘myTable’).schema.fields<— wrong result
2. sqlContext.sql(’show columns in mytable’)  <—— good results

It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 
(see related bugs)

But I am wondering, how else can I update the columns or make sure that spark 
take the new columns?

I already tried to refreshTable and to restart spark.

thanks




Re: Sqoop on Spark

2016-04-10 Thread Jörn Franke

I am not 100% sure, but you could export to CSV in Oracle using external tables.

Oracle has also the Hadoop Loader, which seems to support Avro. However, I 
think you need to buy the Big Data solution.

> On 10 Apr 2016, at 16:12, Mich Talebzadeh  wrote:
> 
> Yes I meant MR.
> 
> Again one cannot beat the RDBMS export utility. I was specifically referring 
> to Oracle in above case that does not provide any specific text bases export 
> except the binary one Exp, data pump etc).
> 
> In case of SAPO ASE, Sybase IQ, and MSSQL, one can use BCP (bulk copy) that 
> can be parallelised either through range partitioning or simple round robin 
> partitioning that can be used to get data out to file in parallel. Then once 
> get data into Hive table through import etc.
> 
> In general if the source table is very large you can used either SAP 
> Replication Server (SRS) or Oracle Golden Gate to get data to Hive. Both 
> these replication tools provide connectors to Hive and they do a good job. If 
> one has something like Oracle in Prod then there is likely a Golden Gate 
> there. For bulk setting of Hive tables and data migration, replication server 
> is good option.
> 
> HTH
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  
> 
>> On 10 April 2016 at 14:24, Michael Segel  wrote:
>> Sqoop doesn’t use MapR… unless you meant to say M/R (Map Reduce) 
>> 
>> The largest problem with sqoop is that in order to gain parallelism you need 
>> to know how your underlying table is partitioned and to do multiple range 
>> queries. This may not be known, or your data may or may not be equally 
>> distributed across the ranges.  
>> 
>> If you’re bringing over the entire table, you may find dropping it and then 
>> moving it to HDFS and then doing a bulk load to be more efficient.
>> (This is less flexible than sqoop, but also stresses the database servers 
>> less. ) 
>> 
>> Again, YMMV
>> 
>> 
>>> On Apr 8, 2016, at 9:17 AM, Mich Talebzadeh  
>>> wrote:
>>> 
>>> Well unless you have plenty of memory, you are going to have certain issues 
>>> with Spark.
>>> 
>>> I tried to load a billion rows table from oracle through spark using JDBC 
>>> and ended up with "Caused by: java.lang.OutOfMemoryError: Java heap space" 
>>> error.
>>> 
>>> Sqoop uses MapR and does it in serial mode which takes time and you can 
>>> also tell it to create Hive table. However, it will import data into Hive 
>>> table.
>>> 
>>> In any case the mechanism of data import is through JDBC, Spark uses memory 
>>> and DAG, whereas Sqoop relies on MapR.
>>> 
>>> There is of course another alternative.
>>> 
>>> Assuming that your Oracle table has a primary Key say "ID" (it would be 
>>> easier if it was a monotonically increasing number) or already partitioned.
>>> 
>>> You can create views based on the range of ID or for each partition. You 
>>> can then SELECT COLUMNS  co1, col2, coln from view and spool it to a text 
>>> file on OS (locally say backup directory would be fastest).
>>> bzip2 those files and scp them to a local directory in Hadoop
>>> You can then use Spark/hive to load the target table from local files in 
>>> parallel
>>> When creating views take care of NUMBER and CHAR columns in Oracle and 
>>> convert them to TO_CHAR(NUMBER_COLUMN) and varchar CAST(coln AS 
>>> VARCHAR2(n)) AS coln etc 
>>> 
>>> HTH
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>>  
>>> 
 On 8 April 2016 at 10:07, Gourav Sengupta  
 wrote:
 Hi,
 
 Some metrics thrown around the discussion:
 
 SQOOP: extract 500 million rows (in single thread) 20 mins (data size 21 
 GB)
 SPARK: load the data into memory (15 mins)
 
 SPARK: use JDBC (and similar to SQOOP difficult parallelization) to load 
 500 million records - manually killed after 8 hours.
 
 (both the above studies were done in a system of same capacity, with 32 GB 
 RAM and dual hexacore Xeon processors and SSD. SPARK was running locally, 
 and SQOOP ran on HADOOP2 and extracted data to local file system)
 
 In case any one needs to know what needs to be done to access both the CSV 
 and JDBC modules in SPARK Local Server mode, please let me know.
 
 
 Regards,
 Gourav Sengupta
 
> On Thu, Apr 7, 2016 at 12:26 AM, Yong Zhang  wrote:
> Good to know that.
> 
> That is why Sqoop has this "direct" mode, to utilize the vendor specific 
> feature.
> 
> But for MPP, I still think it makes sense that vendor provide some kind 
> of InputFormat, or data source in Spark, so Hadoop eco-system can 
> 

Infinite recursion in createDataFrame for avro types

2016-04-10 Thread Brad Cox
I'm getting a StackOverflowError from inside the createDataFrame call in this 
example. It originates in scala code involving java type inferencing which 
calls itself in an infinite loop.

final EventParser parser = new EventParser();
JavaRDD eventRDD = sc.textFile(path)
.map(new Function()
{
public Event call(String line) throws Exception
{
Event event = parser.parse(line);
log.info("event: "+event.toString());
return event;
}
});
log.info("eventRDD:" + eventRDD.toDebugString());

DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();

The bottom of the stack trace looks like this:

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

This looks similar to the bug reported in 
http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html
 but I'm using Spark 1.4.1 which is later than when this bug was repaired.

The Event class is generated by avro from this avsc. It does contain double and 
long fields which has been reported as causing problems but replacing double 
with string doesn't change the symptoms.

{
"namespace": "mynamespace", 
"type": "record", 
"name": "Event", 
"fields": [
{ "name": "ts", "type": "double", "doc": "Timestamp"},
{ "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
{ "name": "idorigh", "type": "string", "doc": "Originating endpoint’s 
IP address (AKA ORIG)"},
{ "name": "idorigp", "type": "int", "doc": "Originating endpoint’s 
TCP/UDP port (or ICMP code)"},
{ "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP 
address (AKA RESP)"},
{ "name": "idrespp", "type": "int", "doc": "Responding endpoint’s 
TCP/UDP port (or ICMP code)"},
{ "name": "proto", "type": "string", "doc": "Transport layer protocol 
of connection"},
{ "name": "service", "type": "string", "doc": "Dynamically detected 
application protocol, if any"},
{ "name": "duration", "type": "double", "doc": "Time of last packet 
seen – time of first packet seen"},
{ "name": "origbytes", "type": "int", "doc": "Originator payload bytes; 
from sequence numbers if TCP"},
{ "name": "respbytes", "type": "int", "doc": "Responder payload bytes; 
from sequence numbers if TCP"},
{ "name": "connstate", "type": "string", "doc": "Connection state (see 
conn.log:conn_state table)"},
{ "name": "localorig", "type": "boolean", "doc": "If conn originated 
locally T; if remotely F."},
{ "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
{ "name": "missedbytes", "type": "int", "doc": "Number of missing bytes 
in content gaps"},
{ "name": "history", "type": "string", "doc": "Connection state history 
(see conn.log:history table)"},
{ "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG 
packets"},
{ "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of 
RESP IP bytes (via IP total_length header field)"},
{ "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP 
packets"},
{ "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of 
RESP IP bytes (via IP total_length header field)"},
{ "name": "tunnelparents", "type": [ "string", "null"], "doc": "If 
tunneled, connection UID of encapsulating parent (s)"},
{ "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP 
Country Code"},
{ "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP 
Country Code"}
]
}

Could someone pls advise? Thanks!

Also posted at: 
https://stackoverflow.com/questions/36532237/infinite-recursion-in-createdataframe-for-avro-types

Dr. Brad J. CoxCell: 703-594-1883 Skype: dr.brad.cox





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How Application jar is copied to worker machines?

2016-04-10 Thread Hemalatha A
Hello,

I want to know on doing spark-submit, how is the Application jar copied to
worker machines? Who does the copying of Jars?

Similarly who copies DAG from driver to executors?

-- 


Regards
Hemalatha


Re: Weird error while serialization

2016-04-10 Thread Ted Yu
Have you considered using PairRDDFunctions.aggregateByKey
or PairRDDFunctions.reduceByKey in place of the groupBy to achieve better
performance ?

Cheers

On Sat, Apr 9, 2016 at 2:00 PM, SURAJ SHETH  wrote:

> Hi,
> I am using Spark 1.5.2
>
> The file contains 900K rows each with twelve fields (tab separated):
> The first 11 fields are Strings with a maximum of 20 chars each. The last
> field is a comma separated array of floats with 8,192 values.
>
> It works perfectly if I change the below code for groupBy from
> "x[0].split('\t')[1]" to "x[0]".
> The reason seems to be due to the limit of the number of values for a
> particular key in groupby. In the below code, I am expecting 500 keys with
> tens of thousands of values in a few of them. The largest key value
> pair(from groupByKey) has 53K values each having a numpy array of 8192
> floats.
> In the changed version, i.e. "groupBy(lambda x : x[0]).mapValues(", we get
> 900K keys and one value for each of them which works flawlessly.
>
> Do we have any limit on the amount of data we get for a key in groupBy?
>
> The total file size is 16 GB.
>
> The snippet is :
>
> import hashlib,re, numpy as np
>
> def getRows(z):
> return np.asfortranarray([float(g) for g in z.split(',')])
>
> text1 = sc.textFile('/textFile.txt',480).filter(lambda x : len(x)>1000)\
> .map(lambda x : x.rsplit('\t',1)).map(lambda x :
> [x[0],getRows(x[1])]).cache()\
> .groupBy(lambda x : x[0].split('\t')[1]).mapValues(lambda x :
> list(x)).cache()
>
> text1.count()
>
> Thanks and Regards,
> Suraj Sheth
>
> On Sun, Apr 10, 2016 at 1:19 AM, Ted Yu  wrote:
>
>> The value was out of the range of integer.
>>
>> Which Spark release are you using ?
>>
>> Can you post snippet of code which can reproduce the error ?
>>
>> Thanks
>>
>> On Sat, Apr 9, 2016 at 12:25 PM, SURAJ SHETH  wrote:
>>
>>> I am trying to perform some processing and cache and count the RDD.
>>> Any solutions?
>>>
>>> Seeing a weird error :
>>>
>>> File 
>>> "/mnt/yarn/usercache/hadoop/appcache/application_1456909219314_0014/container_1456909219314_0014_01_04/pyspark.zip/pyspark/serializers.py",
>>>  line 550, in write_int
>>> stream.write(struct.pack("!i", value))
>>> error: 'i' format requires -2147483648 <= number <= 2147483647
>>>
>>> at 
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>>> at 
>>> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
>>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>>>
>>>
>>> Thanks and Regards,
>>>
>>> Suraj Sheth
>>>
>>>
>>
>


Re: Only 60% of Total Spark Batch Application execution time spent in Task Processing

2016-04-10 Thread Ted Yu
Jasmine:
Let's know if listening to more events would give you better picture.

Thanks

On Thu, Apr 7, 2016 at 1:54 PM, Jasmine George  wrote:

> Hi Ted,
>
>
>
> Thanks for replying so fast.
>
>
>
> We are using spark 1.5.2.
>
> I was collecting only TaskEnd Events.
>
> I can do the event wise summation for couple of runs and get back to you.
>
>
>
> Thanks,
>
> Jasmine
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Thursday, April 07, 2016 1:43 PM
> *To:* JasmineGeorge
> *Cc:* user
> *Subject:* Re: Only 60% of Total Spark Batch Application execution time
> spent in Task Processing
>
>
>
> Which Spark release are you using ?
>
>
>
> Have you registered to all the events provided by SparkListener ?
>
>
>
> If so, can you do event-wise summation of execution time ?
>
>
>
> Thanks
>
>
>
> On Thu, Apr 7, 2016 at 11:03 AM, JasmineGeorge 
> wrote:
>
> We are running a batch job with the following specifications
> •   Building RandomForest with config : maxbins=100, depth=19, num of
> trees =
> 20
> •   Multiple runs with different input data size 2.8 GB, 10 Million
> records
> •   We are running spark application on Yarn in cluster mode, with 3
> Node
> Managers(each with 16 virtual cores and 96G RAM)
> •   Spark config :
> o   spark.driver.cores = 2
> o   spark.driver.memory = 32 G
> o   spark.executor.instances = 5  and spark.executor.cores = 8 so 40
> cores in
> total.
> o   spark.executor.memory= 32G so total executor memory around 160 G.
>
> We are collecting execution times for the tasks using a SparkListener, and
> also the total execution time for the application from the Spark Web UI.
> Across all the tests we saw consistently that,  sum total of the execution
> times of all the tasks is accounting to about 60% of the total application
> run time.
> We are just kind of wondering where is the rest of the 40% of the time
> being
> spent.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Only-60-of-Total-Spark-Batch-Application-execution-time-spent-in-Task-Processing-tp26703.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Number of executors in spark-1.6 and spark-1.5

2016-04-10 Thread Vikash Pareek
Hi Talebzadeh,

Thank for your quick response.

>>in 1.6, how many executors do you see for each node?
I have1 executor for 1 node with SPARK_WORKER_INSTANCES=1.

>>in standalone mode how are you increasing the number of worker instances.
Are you starting another slave on each node?
No, I am not starting another slave node, I just changed *spark-env.sh *for
each slave node i.e. set SPARK_WORKER_INSTANCES=2.





Best Regards,


Vikash Pareek
Software Developer, *InfoObjects Inc.*
m: +918800206898 a: E5, Jhalana Institutional Area, Jaipur
s: vikaspareek1991 e: vikash.par...@infoobjects.com



On Sun, Apr 10, 2016 at 3:00 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> in 1.6, how many executors do you see for each node?
> in standalone mode how are you increasing the number of worker instances.
> Are you starting another slave on each node?
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 10 April 2016 at 08:26, Vikash Pareek 
> wrote:
>
>> Hi,
>>
>> I have upgraded 5 node spark cluster from spark-1.5 to spark-1.6 (to use
>> mapWithState function).
>> After using spark-1.6, I am getting a strange behaviour of spark, jobs are
>> not using multiple executors of different nodes at a time means there is
>> no
>> parallel processing if each node having single worker and executor.
>> I am running jobs in spark standalone mode.
>>
>> I observed following points related to this issue.
>> 1. If I run same job with spark-1.5 then this will use multiple executors
>> across different nodes at a time.
>> 2. In Spark-1.6, If I increase no of cores(spark.cores.max) then jobs are
>> running in parallel thread but within same executor.
>> 3. In Spark-1.6, If I increase no of worker instances on each node then
>> jobs
>> are running in parallel as no of workers but within same executor.
>>
>> Can anyone suggest, why spark 1.6 can not use multiple executors across
>> different node at a time for parallel processing.
>> Your suggestion will be highly appreciated.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-in-spark-1-6-and-spark-1-5-tp26733.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Sqoop on Spark

2016-04-10 Thread Mich Talebzadeh
Yes I meant MR.

Again one cannot beat the RDBMS export utility. I was specifically
referring to Oracle in above case that does not provide any specific text
bases export except the binary one Exp, data pump etc).

In case of SAPO ASE, Sybase IQ, and MSSQL, one can use BCP (bulk copy) that
can be parallelised either through range partitioning or simple round robin
partitioning that can be used to get data out to file in parallel. Then
once get data into Hive table through import etc.

In general if the source table is very large you can used either SAP
Replication Server (SRS) or Oracle Golden Gate to get data to Hive. Both
these replication tools provide connectors to Hive and they do a good job.
If one has something like Oracle in Prod then there is likely a Golden Gate
there. For bulk setting of Hive tables and data migration, replication
server is good option.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 10 April 2016 at 14:24, Michael Segel  wrote:

> Sqoop doesn’t use MapR… unless you meant to say M/R (Map Reduce)
>
> The largest problem with sqoop is that in order to gain parallelism you
> need to know how your underlying table is partitioned and to do multiple
> range queries. This may not be known, or your data may or may not be
> equally distributed across the ranges.
>
> If you’re bringing over the entire table, you may find dropping it and
> then moving it to HDFS and then doing a bulk load to be more efficient.
> (This is less flexible than sqoop, but also stresses the database servers
> less. )
>
> Again, YMMV
>
>
> On Apr 8, 2016, at 9:17 AM, Mich Talebzadeh 
> wrote:
>
> Well unless you have plenty of memory, you are going to have certain
> issues with Spark.
>
> I tried to load a billion rows table from oracle through spark using JDBC
> and ended up with "Caused by: java.lang.OutOfMemoryError: Java heap space"
> error.
>
> Sqoop uses MapR and does it in serial mode which takes time and you can
> also tell it to create Hive table. However, it will import data into Hive
> table.
>
> In any case the mechanism of data import is through JDBC, Spark uses
> memory and DAG, whereas Sqoop relies on MapR.
>
> There is of course another alternative.
>
> Assuming that your Oracle table has a primary Key say "ID" (it would be
> easier if it was a monotonically increasing number) or already partitioned.
>
>
>1. You can create views based on the range of ID or for each
>partition. You can then SELECT COLUMNS  co1, col2, coln from view and spool
>it to a text file on OS (locally say backup directory would be fastest).
>2. bzip2 those files and scp them to a local directory in Hadoop
>3. You can then use Spark/hive to load the target table from local
>files in parallel
>4. When creating views take care of NUMBER and CHAR columns in Oracle
>and convert them to TO_CHAR(NUMBER_COLUMN) and varchar CAST(coln AS
>VARCHAR2(n)) AS coln etc
>
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 8 April 2016 at 10:07, Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> Some metrics thrown around the discussion:
>>
>> SQOOP: extract 500 million rows (in single thread) 20 mins (data size 21
>> GB)
>> SPARK: load the data into memory (15 mins)
>>
>> SPARK: use JDBC (and similar to SQOOP difficult parallelization) to load
>> 500 million records - manually killed after 8 hours.
>>
>> (both the above studies were done in a system of same capacity, with 32
>> GB RAM and dual hexacore Xeon processors and SSD. SPARK was running
>> locally, and SQOOP ran on HADOOP2 and extracted data to local file system)
>>
>> In case any one needs to know what needs to be done to access both the
>> CSV and JDBC modules in SPARK Local Server mode, please let me know.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Apr 7, 2016 at 12:26 AM, Yong Zhang  wrote:
>>
>>> Good to know that.
>>>
>>> That is why Sqoop has this "direct" mode, to utilize the vendor specific
>>> feature.
>>>
>>> But for MPP, I still think it makes sense that vendor provide some kind
>>> of InputFormat, or data source in Spark, so Hadoop eco-system can integrate
>>> with them more natively.
>>>
>>> Yong
>>>
>>> --
>>> Date: Wed, 6 Apr 2016 16:12:30 -0700
>>> Subject: Re: Sqoop on Spark
>>> From: mohaj...@gmail.com
>>> To: java8...@hotmail.com
>>> CC: mich.talebza...@gmail.com; jornfra...@gmail.com;
>>> msegel_had...@hotmail.com; guha.a...@gmail.com; linguin@gmail.com;
>>> 

Re: Sqoop on Spark

2016-04-10 Thread Michael Segel
Sqoop doesn’t use MapR… unless you meant to say M/R (Map Reduce) 

The largest problem with sqoop is that in order to gain parallelism you need to 
know how your underlying table is partitioned and to do multiple range queries. 
This may not be known, or your data may or may not be equally distributed 
across the ranges.  

If you’re bringing over the entire table, you may find dropping it and then 
moving it to HDFS and then doing a bulk load to be more efficient.
(This is less flexible than sqoop, but also stresses the database servers less. 
) 

Again, YMMV


> On Apr 8, 2016, at 9:17 AM, Mich Talebzadeh  wrote:
> 
> Well unless you have plenty of memory, you are going to have certain issues 
> with Spark.
> 
> I tried to load a billion rows table from oracle through spark using JDBC and 
> ended up with "Caused by: java.lang.OutOfMemoryError: Java heap space" error.
> 
> Sqoop uses MapR and does it in serial mode which takes time and you can also 
> tell it to create Hive table. However, it will import data into Hive table.
> 
> In any case the mechanism of data import is through JDBC, Spark uses memory 
> and DAG, whereas Sqoop relies on MapR.
> 
> There is of course another alternative.
> 
> Assuming that your Oracle table has a primary Key say "ID" (it would be 
> easier if it was a monotonically increasing number) or already partitioned.
> 
> You can create views based on the range of ID or for each partition. You can 
> then SELECT COLUMNS  co1, col2, coln from view and spool it to a text file on 
> OS (locally say backup directory would be fastest).
> bzip2 those files and scp them to a local directory in Hadoop
> You can then use Spark/hive to load the target table from local files in 
> parallel
> When creating views take care of NUMBER and CHAR columns in Oracle and 
> convert them to TO_CHAR(NUMBER_COLUMN) and varchar CAST(coln AS VARCHAR2(n)) 
> AS coln etc 
> 
> HTH
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 8 April 2016 at 10:07, Gourav Sengupta  > wrote:
> Hi,
> 
> Some metrics thrown around the discussion:
> 
> SQOOP: extract 500 million rows (in single thread) 20 mins (data size 21 GB)
> SPARK: load the data into memory (15 mins)
> 
> SPARK: use JDBC (and similar to SQOOP difficult parallelization) to load 500 
> million records - manually killed after 8 hours.
> 
> (both the above studies were done in a system of same capacity, with 32 GB 
> RAM and dual hexacore Xeon processors and SSD. SPARK was running locally, and 
> SQOOP ran on HADOOP2 and extracted data to local file system)
> 
> In case any one needs to know what needs to be done to access both the CSV 
> and JDBC modules in SPARK Local Server mode, please let me know.
> 
> 
> Regards,
> Gourav Sengupta
> 
> On Thu, Apr 7, 2016 at 12:26 AM, Yong Zhang  > wrote:
> Good to know that.
> 
> That is why Sqoop has this "direct" mode, to utilize the vendor specific 
> feature.
> 
> But for MPP, I still think it makes sense that vendor provide some kind of 
> InputFormat, or data source in Spark, so Hadoop eco-system can integrate with 
> them more natively.
> 
> Yong
> 
> Date: Wed, 6 Apr 2016 16:12:30 -0700
> Subject: Re: Sqoop on Spark
> From: mohaj...@gmail.com 
> To: java8...@hotmail.com 
> CC: mich.talebza...@gmail.com ; 
> jornfra...@gmail.com ; msegel_had...@hotmail.com 
> ; guha.a...@gmail.com 
> ; linguin@gmail.com 
> ; user@spark.apache.org 
> 
> 
> 
> It is using JDBC driver, i know that's the case for Teradata:
> http://developer.teradata.com/connectivity/articles/teradata-connector-for-hadoop-now-available
>  
> 
> 
> Teradata Connector (which is used by Cloudera and Hortonworks) for doing 
> Sqoop is parallelized and works with ORC and probably other formats as well. 
> It is using JDBC for each connection between data-nodes and their AMP 
> (compute) nodes. There is an additional layer that coordinates all of it.
> I know Oracle has a similar technology I've used it and had to supply the 
> JDBC driver.
> 
> Teradata Connector is for batch data copy, QueryGrid is for interactive data 
> movement.
> 
> On Wed, Apr 6, 2016 at 4:05 PM, Yong Zhang  > wrote:
> If they do that, they must provide a customized input format, instead of 

Re: Datasets combineByKey

2016-04-10 Thread Amit Sela
I'm mapping RDD API to Datasets API and I was wondering if I was missing
something or is this functionality is missing.

On Sun, Apr 10, 2016 at 3:00 PM Ted Yu  wrote:

> Haven't found any JIRA w.r.t. combineByKey for Dataset.
>
> What's your use case ?
>
> Thanks
>
> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela  wrote:
>
>> Is there (planned ?) a combineByKey support for Dataset ?
>> Is / Will there be a support for combiner lifting ?
>>
>> Thanks,
>> Amit
>>
>
>


RE: Unable run Spark in YARN mode

2016-04-10 Thread Yu, Yucai
Could you follow this guide 
http://spark.apache.org/docs/latest/running-on-yarn.html#configuration?

Thanks,
Yucai

-Original Message-
From: maheshmath [mailto:mahesh.m...@gmail.com] 
Sent: Saturday, April 9, 2016 1:58 PM
To: user@spark.apache.org
Subject: Unable run Spark in YARN mode

I have set SPARK_LOCAL_IP=127.0.0.1 still getting below error

16/04/09 10:36:50 INFO spark.SecurityManager: Changing view acls to: mahesh
16/04/09 10:36:50 INFO spark.SecurityManager: Changing modify acls to:
mahesh
16/04/09 10:36:50 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(mahesh); users with modify permissions: Set(mahesh)
16/04/09 10:36:51 INFO util.Utils: Successfully started service
'sparkDriver' on port 43948.
16/04/09 10:36:51 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/04/09 10:36:51 INFO Remoting: Starting remoting
16/04/09 10:36:52 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@127.0.0.1:32792]
16/04/09 10:36:52 INFO util.Utils: Successfully started service
'sparkDriverActorSystem' on port 32792.
16/04/09 10:36:52 INFO spark.SparkEnv: Registering MapOutputTracker
16/04/09 10:36:52 INFO spark.SparkEnv: Registering BlockManagerMaster
16/04/09 10:36:52 INFO storage.DiskBlockManager: Created local directory at
/tmp/blockmgr-a2079037-6bbe-49ce-ba78-d475e38ad362
16/04/09 10:36:52 INFO storage.MemoryStore: MemoryStore started with
capacity 517.4 MB
16/04/09 10:36:52 INFO spark.SparkEnv: Registering OutputCommitCoordinator
16/04/09 10:36:53 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/04/09 10:36:53 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
16/04/09 10:36:53 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.
16/04/09 10:36:53 INFO ui.SparkUI: Started SparkUI at http://127.0.0.1:4040
16/04/09 10:36:53 INFO client.RMProxy: Connecting to ResourceManager at
/0.0.0.0:8032
16/04/09 10:36:54 INFO yarn.Client: Requesting a new application from
cluster with 1 NodeManagers
16/04/09 10:36:54 INFO yarn.Client: Verifying our application has not
requested more than the maximum memory capability of the cluster (8192 MB
per container)
16/04/09 10:36:54 INFO yarn.Client: Will allocate AM container, with 896 MB
memory including 384 MB overhead
16/04/09 10:36:54 INFO yarn.Client: Setting up container launch context for
our AM
16/04/09 10:36:54 INFO yarn.Client: Setting up the launch environment for
our AM container
16/04/09 10:36:54 INFO yarn.Client: Preparing resources for our AM container
16/04/09 10:36:56 INFO yarn.Client: Uploading resource
file:/home/mahesh/Programs/spark-1.6.1-bin-hadoop2.6/lib/spark-assembly-1.6.1-hadoop2.6.0.jar
->
hdfs://localhost:54310/user/mahesh/.sparkStaging/application_1460137661144_0003/spark-assembly-1.6.1-hadoop2.6.0.jar
16/04/09 10:36:59 INFO yarn.Client: Uploading resource
file:/tmp/spark-f28e3fd5-4dcd-4199-b298-c7fc607dedb4/__spark_conf__5551799952710555772.zip
->
hdfs://localhost:54310/user/mahesh/.sparkStaging/application_1460137661144_0003/__spark_conf__5551799952710555772.zip
16/04/09 10:36:59 INFO spark.SecurityManager: Changing view acls to: mahesh
16/04/09 10:36:59 INFO spark.SecurityManager: Changing modify acls to:
mahesh
16/04/09 10:36:59 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(mahesh); users with modify permissions: Set(mahesh)
16/04/09 10:36:59 INFO yarn.Client: Submitting application 3 to
ResourceManager
16/04/09 10:36:59 INFO impl.YarnClientImpl: Submitted application
application_1460137661144_0003
16/04/09 10:37:00 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:00 INFO yarn.Client: 
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1460178419692
 final status: UNDEFINED
 tracking URL: http://gubbi:8088/proxy/application_1460137661144_0003/
 user: mahesh
16/04/09 10:37:01 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:02 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:03 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:04 INFO yarn.Client: Application report for
application_1460137661144_0003 (state: ACCEPTED)
16/04/09 10:37:05 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster registered as NettyRpcEndpointRef(null)
16/04/09 10:37:05 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter.
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS
-> gubbi, PROXY_URI_BASES ->
http://gubbi:8088/proxy/application_1460137661144_0003),
/proxy/application_1460137661144_0003
16/04/09 10:37:05 INFO ui.JettyUtils: 

LinkedIn streams in Spark

2016-04-10 Thread Deepak Sharma
Hello All,
I am looking for a use case where anyone have used spark streaming
integration with LinkedIn.

-- 
Thanks
Deepak


Re: Datasets combineByKey

2016-04-10 Thread Ted Yu
Haven't found any JIRA w.r.t. combineByKey for Dataset.

What's your use case ?

Thanks

On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela  wrote:

> Is there (planned ?) a combineByKey support for Dataset ?
> Is / Will there be a support for combiner lifting ?
>
> Thanks,
> Amit
>


Re: Graphframes pattern causing java heap space errors

2016-04-10 Thread Ted Yu
Looks like the exception occurred on driver.

Consider increasing the values for the following config:

conf.set("spark.driver.memory", "10240m")
conf.set("spark.driver.maxResultSize", "2g")

Cheers

On Sat, Apr 9, 2016 at 9:02 PM, Buntu Dev  wrote:

> I'm running it via pyspark against yarn in client deploy mode. I do notice
> in the spark web ui under Environment tab all the options I've set, so I'm
> guessing these are accepted.
>
> On Sat, Apr 9, 2016 at 5:52 PM, Jacek Laskowski  wrote:
>
>> Hi,
>>
>> (I haven't played with GraphFrames)
>>
>> What's your `sc.master`? How do you run your application --
>> spark-submit or java -jar or sbt run or...? The reason I'm asking is
>> that few options might not be in use whatsoever, e.g.
>> spark.driver.memory and spark.executor.memory in local mode.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Apr 9, 2016 at 7:51 PM, Buntu Dev  wrote:
>> > I'm running this motif pattern against 1.5M vertices (5.5mb) and 10M
>> (60mb)
>> > edges:
>> >
>> >  tgraph.find("(a)-[]->(b); (c)-[]->(b); (c)-[]->(d)")
>> >
>> > I keep running into Java heap space errors:
>> >
>> > ~
>> >
>> > ERROR actor.ActorSystemImpl: Uncaught fatal error from thread
>> > [sparkDriver-akka.actor.default-dispatcher-33] shutting down ActorSystem
>> > [sparkDriver]
>> > java.lang.OutOfMemoryError: Java heap space
>> > at scala.reflect.ManifestFactory$$anon$6.newArray(Manifest.scala:90)
>> > at scala.reflect.ManifestFactory$$anon$6.newArray(Manifest.scala:88)
>> > at scala.Array$.ofDim(Array.scala:218)
>> > at akka.util.ByteIterator.toArray(ByteIterator.scala:462)
>> > at akka.util.ByteString.toArray(ByteString.scala:321)
>> > at
>> >
>> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168)
>> > at
>> >
>> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:513)
>> > at
>> >
>> akka.remote.transport.ProtocolStateActor$$anonfun$5.applyOrElse(AkkaProtocolTransport.scala:357)
>> > at
>> >
>> akka.remote.transport.ProtocolStateActor$$anonfun$5.applyOrElse(AkkaProtocolTransport.scala:352)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>> > at akka.actor.FSM$class.processEvent(FSM.scala:595)
>> > at
>> >
>> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:220)
>> > at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589)
>> > at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> > at
>> >
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> > ~
>> >
>> >
>> > Here is my config:
>> >
>> > conf.set("spark.executor.memory", "8192m")
>> > conf.set("spark.executor.cores", 4)
>> > conf.set("spark.driver.memory", "10240m")
>> > conf.set("spark.driver.maxResultSize", "2g")
>> > conf.set("spark.kryoserializer.buffer.max", "1024mb")
>> >
>> >
>> > Wanted to know if there are any other configs to tweak?
>> >
>> >
>> > Thanks!
>>
>
>


Re: alter table add columns aternatives or hive refresh

2016-04-10 Thread Mich Talebzadeh
I have not tried it on Spark but the column added in Hive to an existing
table cannot be updated for existing rows. In other words the new column is
set to null which does not require the change in the existing file length.

So basically as I understand when a  column is added to an already table.

1.The metadata for the underlying table will be updated
2.The new column will by default have null value
3.The existing rows cannot have new column updated to a non null value
4.New rows can have non null values set for the new column
5.No sql operation can be done on that column. For example select *
from  where new_column IS NOT NULL
6.The easiest option is to create a new table with the new column and
do insert/select from the existing table with values set for the new column

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 10 April 2016 at 05:06, Maurin Lenglart  wrote:

> Hi,
> I am trying to add columns to table that I created with the “saveAsTable”
> api.
> I update the columns using sqlContext.sql(‘alter table myTable add columns
> (mycol string)’).
> The next time I create a df and save it in the same table, with the new
> columns I get a :
> “ParquetRelation
>  requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE
> statement generates the same number of columns as its schema.”
>
> Also thise two commands don t return the same columns :
> 1. sqlContext.table(‘myTable’).schema.fields<— wrong result
> 2. sqlContext.sql(’show columns in mytable’)  <—— good results
>
> It seems to be a known bug :
> https://issues.apache.org/jira/browse/SPARK-9764 (see related bugs)
>
> But I am wondering, how else can I update the columns or make sure that
> spark take the new columns?
>
> I already tried to refreshTable and to restart spark.
>
> thanks
>
>


Re: Number of executors in spark-1.6 and spark-1.5

2016-04-10 Thread Mich Talebzadeh
Hi,

in 1.6, how many executors do you see for each node?
in standalone mode how are you increasing the number of worker instances.
Are you starting another slave on each node?

HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 10 April 2016 at 08:26, Vikash Pareek 
wrote:

> Hi,
>
> I have upgraded 5 node spark cluster from spark-1.5 to spark-1.6 (to use
> mapWithState function).
> After using spark-1.6, I am getting a strange behaviour of spark, jobs are
> not using multiple executors of different nodes at a time means there is no
> parallel processing if each node having single worker and executor.
> I am running jobs in spark standalone mode.
>
> I observed following points related to this issue.
> 1. If I run same job with spark-1.5 then this will use multiple executors
> across different nodes at a time.
> 2. In Spark-1.6, If I increase no of cores(spark.cores.max) then jobs are
> running in parallel thread but within same executor.
> 3. In Spark-1.6, If I increase no of worker instances on each node then
> jobs
> are running in parallel as no of workers but within same executor.
>
> Can anyone suggest, why spark 1.6 can not use multiple executors across
> different node at a time for parallel processing.
> Your suggestion will be highly appreciated.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-in-spark-1-6-and-spark-1-5-tp26733.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Number of executors in spark-1.6 and spark-1.5

2016-04-10 Thread Vikash Pareek
Hi,

I have upgraded 5 node spark cluster from spark-1.5 to spark-1.6 (to use
mapWithState function).
After using spark-1.6, I am getting a strange behaviour of spark, jobs are
not using multiple executors of different nodes at a time means there is no
parallel processing if each node having single worker and executor.
I am running jobs in spark standalone mode.

I observed following points related to this issue.
1. If I run same job with spark-1.5 then this will use multiple executors
across different nodes at a time.
2. In Spark-1.6, If I increase no of cores(spark.cores.max) then jobs are
running in parallel thread but within same executor.
3. In Spark-1.6, If I increase no of worker instances on each node then jobs
are running in parallel as no of workers but within same executor.

Can anyone suggest, why spark 1.6 can not use multiple executors across
different node at a time for parallel processing.
Your suggestion will be highly appreciated.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-in-spark-1-6-and-spark-1-5-tp26733.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org