concurrent.RejectedExecutionException

2016-01-23 Thread Yasemin Kaya
Hi all,

I'm using spark 1.5 and getting this error. Could you help i cant
understand?

16/01/23 10:11:59 ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.scheduler.TaskResultGetter$$anon$2@62c72719 rejected from
java.util.concurrent.ThreadPoolExecutor@57f54b52[Terminated, pool size = 0,
active threads = 0, queued tasks = 0, completed tasks = 85]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at
org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:49)
at
org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:347)
at
org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:330)
at
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:65)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org
$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org
$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16/01/23 10:12:00 WARN QueuedThreadPool: 6 threads could not be stopped
16/01/23 10:12:01 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
16/01/23 10:12:03 INFO MemoryStore: MemoryStore cleared
16/01/23 10:12:04 INFO BlockManager: BlockManager stopped
16/01/23 10:12:04 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/23 10:12:05 INFO SparkContext: Successfully stopped SparkContext
16/01/23 10:12:05 ERROR Executor: Exception in task 2.0 in stage 35.0 (TID
87)
java.io.FileNotFoundException:
/tmp/blockmgr-aed2b250-8893-4b5b-b5ef-91928f5547b6/1f/shuffle_9_2_0.data
(No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:177)
at
org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:55)
at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:675)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/01/23 10:12:05 ERROR Executor: Exception in task 1.0 in stage 35.0 (TID
86)
java.io.FileNotFoundException:
/tmp/blockmgr-aed2b250-8893-4b5b-b5ef-91928f5547b6/02/shuffle_9_1_0.data
(No such file or directory)

Re: concurrent.RejectedExecutionException

2016-01-23 Thread Ted Yu
This seems related:

SPARK-8029 ShuffleMapTasks must be robust to concurrent attempts on the
same executor

Mind trying out 1.5.3 or later release ?

Cheers

On Sat, Jan 23, 2016 at 12:51 AM, Yasemin Kaya  wrote:

> Hi all,
>
> I'm using spark 1.5 and getting this error. Could you help i cant
> understand?
>
> 16/01/23 10:11:59 ERROR TaskSchedulerImpl: Exception in statusUpdate
> java.util.concurrent.RejectedExecutionException: Task
> org.apache.spark.scheduler.TaskResultGetter$$anon$2@62c72719 rejected
> from java.util.concurrent.ThreadPoolExecutor@57f54b52[Terminated, pool
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 85]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> at
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> at
> org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:49)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:347)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:330)
> at
> org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:65)
> at org.apache.spark.rpc.akka.AkkaRpcEnv.org
> $apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
> at org.apache.spark.rpc.akka.AkkaRpcEnv.org
> $apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 16/01/23 10:12:00 WARN QueuedThreadPool: 6 threads could not be stopped
> 16/01/23 10:12:01 INFO MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 16/01/23 10:12:03 INFO MemoryStore: MemoryStore cleared
> 16/01/23 10:12:04 INFO BlockManager: BlockManager stopped
> 16/01/23 10:12:04 INFO BlockManagerMaster: BlockManagerMaster stopped
> 16/01/23 10:12:05 INFO SparkContext: Successfully stopped SparkContext
> 16/01/23 10:12:05 ERROR Executor: Exception in task 2.0 in stage 35.0 (TID
> 87)
> java.io.FileNotFoundException:
> /tmp/blockmgr-aed2b250-8893-4b5b-b5ef-91928f5547b6/1f/shuffle_9_2_0.data
> (No such file or directory)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:177)
> at
> org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:55)
> at
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:675)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at

Re: Does filter on an RDD scan every data item ?

2016-01-23 Thread nir
"I don't think you could avoid this 
in general, right, in any system? "

Really?  nosql databases do efficient lookups(and scan) based on key and
partition. look at cassandra, hbase



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p26048.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

2016-01-23 Thread Yash Sharma
Hi Raju,
Could you please explain your expected behavior with the DStream. The
DStream will have event only from the 'fromOffsets' that you provided in
the createDirectStream (which I think is the expected behavior).

For the smaller files, you will have to deal with smaller files if you
intend to write it immediately. Alternately what we do sometimes is-

1.  Maintain couple of iterations for some 30-40 seconds in application
until we have substantial data and then we write them to disk.
2. Push smaller data back to kafka, and a different job handles the save to
disk.

On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti  wrote:

> Thanks for quick reply.
> I am creating Kafka Dstream by passing offsets map. I have pasted code
> snippet in my earlier mail. Let me know am I missing something.
>
> I want to use spark checkpoint for hand ng only driver/executor failures.
> On Jan 22, 2016 10:08 PM, "Cody Koeninger"  wrote:
>
>> Offsets are stored in the checkpoint.  If you want to manage offsets
>> yourself, don't restart from the checkpoint, specify the starting offsets
>> when you create the stream.
>>
>> Have you read / watched the materials linked from
>>
>> https://github.com/koeninger/kafka-exactly-once
>>
>> Regarding the small files problem, either don't use HDFS, or use
>> something like filecrush for merging.
>>
>> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti 
>> wrote:
>>
>>> Hi,
>>>
>>>
>>>I am very new to spark & spark-streaming. I am planning to use spark
>>> streaming for real time processing.
>>>
>>>I have created a streaming context and checkpointing to hdfs
>>> directory for recovery purposes in case of executor failures & driver
>>> failures.
>>>
>>> I am creating Dstream with offset map for getting the data from kafka. I
>>> am simply ignoring the offsets to understand the behavior. Whenver I
>>> restart application driver restored from checkpoint as expected but Dstream
>>> is not getting started from the initial offsets. Dstream was created with
>>> the last consumed offsets instead of startign from 0 offsets for each topic
>>> partition as I am not storing the offsets any where.
>>>
>>> def main : Unit = {
>>>
>>> var sparkStreamingContext = 
>>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>>   () => creatingFunc())
>>>
>>> ...
>>>
>>>
>>> }
>>>
>>> def creatingFunc(): Unit = {
>>>
>>> ...
>>>
>>> var offsets:Map[TopicAndPartition, Long] = 
>>> Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>>
>>> KafkaUtils.createDirectStream[String,String, StringDecoder, 
>>> StringDecoder,
>>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>>
>>> ...
>>> }
>>>
>>> I want to get control over offset management at event level instead of
>>> RDD level to make sure that at least once delivery to end system.
>>>
>>> As per my understanding, every RDD or RDD partition will stored in hdfs
>>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>>> interval then it will be ended up having huge number of small files in
>>> HDFS. Having small files in HDFS will leads to lots of other issues.
>>> Is there any way to write multiple RDDs into single file? Don't have muh
>>> idea about *coalesce* usage. In the worst case, I can merge all small files
>>> in HDFS in regular intervals.
>>>
>>> Thanks...
>>>
>>> --
>>> Thanks
>>> Raju Bairishetti
>>> www.lazada.com
>>>
>>>
>>>
>>>
>>


Spark not saving data to Hive

2016-01-23 Thread Akhilesh Pathodia
Hi,

I am trying to write data from spark to Hive partitioned table:

DataFrame dataFrame = sqlContext.createDataFrame(rdd, schema);
dataFrame.write().partitionBy("YEAR","MONTH","DAY").saveAsTable(tableName);

The data is not being written to hive table (hdfs location:
/user/hive/warehouse//), Below are the logs from spark
executor. As shown in the logs, it is writing the data to
/tmp/spark-a3c7ed0f-76c6-4c3c-b80c-0734e33390a2/metastore/case_logs, but I
did not find this directory in HDFS.

16/01/23 02:15:03 INFO datasources.DynamicPartitionWriterContainer:
Sorting complete. Writing out partition files one at a time.
16/01/23 02:15:03 INFO compress.CodecPool: Got brand-new compressor [.gz]
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/parquet-pig-bundle-1.5.0-cdh5.5.1.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/parquet-hadoop-bundle-1.5.0-cdh5.5.1.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/parquet-format-2.1.0-cdh5.5.1.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/hive-exec-1.1.0-cdh5.5.1.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/hive-jdbc-1.1.0-cdh5.5.1-standalone.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type
[shaded.parquet.org.slf4j.helpers.NOPLoggerFactory]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/01/23 02:15:06 INFO output.FileOutputCommitter: Saved output of
task 'attempt_201601230214_0023_m_00_0' to
file:/tmp/spark-a3c7ed0f-76c6-4c3c-b80c-0734e33390a2/metastore/case_logs
16/01/23 02:15:06 INFO mapred.SparkHadoopMapRedUtil:
attempt_201601230214_0023_m_00_0: Committed
16/01/23 02:15:06 INFO executor.Executor: Finished task 0.0 in stage
23.0 (TID 23). 2013 bytes result sent to driver


I am using CDH 5.5.1 an Spark 1.5.0. Does anybody have idea what is
happening here?

Thanks,
Akhilesh


Re: Does filter on an RDD scan every data item ?

2016-01-23 Thread nir
Looks like this has been supported from 1.4 release :) 
https://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p26049.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to efficiently Scan (not filter nor lookup) part of Paird RDD or Ordered RDD

2016-01-23 Thread Nirav Patel
Problem is I have RDD of about 10M rows and it keeps growing. Everytime
when we want to perform query and compute on subset of data we have to use
filter and then some aggregation. Here I know filter goes through each
partitions and every rows of RDD which may not be efficient at all.

Spark having Ordered RDD functions I dont see why it's so difficult to
implement such function. Cassandra/Hbase has it for years where they can
fetch data only from certain partitions based on your rowkey. Scala TreeMap
has Range function to do the same.

I think people have been looking for this for while. I see several post
asking this.

http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-td20170.html#a26048

By the way, I assume there
Thanks
Nirav

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

2016-01-23 Thread Raju Bairishetti
Thanks for quick reply.
I am creating Kafka Dstream by passing offsets map. I have pasted code
snippet in my earlier mail. Let me know am I missing something.

I want to use spark checkpoint for hand ng only driver/executor failures.
On Jan 22, 2016 10:08 PM, "Cody Koeninger"  wrote:

> Offsets are stored in the checkpoint.  If you want to manage offsets
> yourself, don't restart from the checkpoint, specify the starting offsets
> when you create the stream.
>
> Have you read / watched the materials linked from
>
> https://github.com/koeninger/kafka-exactly-once
>
> Regarding the small files problem, either don't use HDFS, or use something
> like filecrush for merging.
>
> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti  wrote:
>
>> Hi,
>>
>>
>>I am very new to spark & spark-streaming. I am planning to use spark
>> streaming for real time processing.
>>
>>I have created a streaming context and checkpointing to hdfs directory
>> for recovery purposes in case of executor failures & driver failures.
>>
>> I am creating Dstream with offset map for getting the data from kafka. I
>> am simply ignoring the offsets to understand the behavior. Whenver I
>> restart application driver restored from checkpoint as expected but Dstream
>> is not getting started from the initial offsets. Dstream was created with
>> the last consumed offsets instead of startign from 0 offsets for each topic
>> partition as I am not storing the offsets any where.
>>
>> def main : Unit = {
>>
>> var sparkStreamingContext = 
>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>   () => creatingFunc())
>>
>> ...
>>
>>
>> }
>>
>> def creatingFunc(): Unit = {
>>
>> ...
>>
>> var offsets:Map[TopicAndPartition, Long] = 
>> Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>
>> KafkaUtils.createDirectStream[String,String, StringDecoder, 
>> StringDecoder,
>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>
>> ...
>> }
>>
>> I want to get control over offset management at event level instead of
>> RDD level to make sure that at least once delivery to end system.
>>
>> As per my understanding, every RDD or RDD partition will stored in hdfs
>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>> interval then it will be ended up having huge number of small files in
>> HDFS. Having small files in HDFS will leads to lots of other issues.
>> Is there any way to write multiple RDDs into single file? Don't have muh
>> idea about *coalesce* usage. In the worst case, I can merge all small files
>> in HDFS in regular intervals.
>>
>> Thanks...
>>
>> --
>> Thanks
>> Raju Bairishetti
>> www.lazada.com
>>
>>
>>
>>
>


Clarification on Data Frames joins

2016-01-23 Thread Madabhattula Rajesh Kumar
Hi,

I have a big database table(1 million plus records) in oracle. I need to
query records based on input numbers. For this use case, I am doing below
steps

I am creating two data frames.

DF1 = I am computing this DF1 using sql query. It has one million +
records.

DF2 = I have a list of numbers. I am converting list of input numbers to
data-frame

I am converting DF1 and DF2 to register temp table and forming sql query.
It will return input number of records

Steps :-

DF1.registerTempTable("E1")

DF2.registerTempTable("E2")

DF3 = sqlContext.sql(select * from E1, E2 where E1.id = E2.id)

DF3.map(row => (row(0),row(1),row(2))).saveToCassandra(keyspace, table1)

*query :-*

How D3 records will fetch?

Is DF1 load entire table data(1 million plus records) into memory when
joining with DF2 ? (Or) It will fetch only DF2 matched records from oracle
and load into memory.

Please clarify and let me know my approach is correct

Regards,
Rajesh


Re: python - list objects in HDFS directory

2016-01-23 Thread Ted Yu
Is 'hadoop' / 'hdfs' command accessible to your python script ?

If so, you can call 'hdfs dfs -ls' from python.

Cheers

On Sat, Jan 23, 2016 at 4:08 AM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> Hello,
>
> I would like to make a list of files (parquet or json) in a specific
> HDFS directory with python so I can do some logic on which files to
> load into a dataframe.
>
> Any ideas?
>
> Thanks,
>
> Andrew
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark not writing data in Hive format

2016-01-23 Thread Akhilesh Pathodia
Hi,

I am trying to write data from Spark to hive partitioned table. The job is
running without any error, but it is not writing the data to correct
location.

job-executor-0] parquet.ParquetRelation (Logging.scala:logInfo(59)) -
Listing 
file:/yarn/nm/usercache/root/appcache/application_1453561680059_0005/container_e89_1453561680059_0005_01_01/tmp/spark-f252468d-61f0-44f2-8819-34e2c27c80c7/metastore/case_logs
on driver
2016-01-23 07:58:53,223 INFO  [streaming-job-executor-0]
parquet.ParquetRelation (Logging.scala:logInfo(59)) - Listing
file:/yarn/nm/usercache/root/appcache/application_1453561680059_0005/container_e89_1453561680059_0005_01_01/tmp/spark-f252468d-61f0-44f2-8819-34e2c27c80c7/metastore/case_logs
on driver*2016-01-23 07:58:53,276 WARN  [streaming-job-executor-0]
hive.HiveContext$$anon$1 (Logging.scala:logWarning(71)) - Persisting
partitioned data source relation `CASE_LOGS` into Hive metastore in
Spark SQL specific format, which is NOT compatible with Hive. Input
path(s): *
file:/yarn/nm/usercache/root/appcache/application_1453561680059_0005/container_e89_1453561680059_0005_01_01/tmp/spark-f252468d-61f0-44f2-8819-34e2c27c80c7/metastore/case_logs
2016-01-23 07:58:53,454 INFO  [streaming-job-executor-0]
log.PerfLogger (PerfLogger.java:PerfLogBegin(118)) - 654 INFO
[JobScheduler] scheduler.JobScheduler (Logging.scala:logInfo(59)) -
Finished job streaming job 145356471 ms.0 from job set of time
145356471 ms


Its not writing data in Spark SQL format instead of Hive format. Can
anybody tell me how to get rid of this issue?

Spark version - 1.5.0
CDH 5.5.1

Thanks,
Akhilesh Pathodia


Concatenating tables

2016-01-23 Thread Andrew Holway
Is there a data frame operation to do this?

+-+
| A B C D |
+-+
| 1 2 3 4 |
| 5 6 7 8 |
+-+
+-+
| A B C D |
+-+
| 3 5 6 8 |
| 0 0 0 0 |
+-+
+-+
| A B C D |
+-+
| 8 8 8 8 |
| 1 1 1 1 |
+-+

Concatenated together to make this.

+-+
| A B C D |
+-+
| 1 2 3 4 |
| 5 6 7 8 |
| 3 5 6 8 |
| 0 0 0 0 |
| 8 8 8 8 |
| 1 1 1 1 |
+-+

Thanks,

Andrew


Re: Concatenating tables

2016-01-23 Thread Ted Yu
How about this operation :

   * Returns a new [[DataFrame]] containing union of rows in this frame and
another frame.
   * This is equivalent to `UNION ALL` in SQL.
   * @group dfops
   * @since 1.3.0
   */
  def unionAll(other: DataFrame): DataFrame = withPlan {

FYI

On Sat, Jan 23, 2016 at 1:02 PM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> Is there a data frame operation to do this?
>
> +-+
> | A B C D |
> +-+
> | 1 2 3 4 |
> | 5 6 7 8 |
> +-+
> +-+
> | A B C D |
> +-+
> | 3 5 6 8 |
> | 0 0 0 0 |
> +-+
> +-+
> | A B C D |
> +-+
> | 8 8 8 8 |
> | 1 1 1 1 |
> +-+
>
> Concatenated together to make this.
>
> +-+
> | A B C D |
> +-+
> | 1 2 3 4 |
> | 5 6 7 8 |
> | 3 5 6 8 |
> | 0 0 0 0 |
> | 8 8 8 8 |
> | 1 1 1 1 |
> +-+
>
> Thanks,
>
> Andrew
>


Re: Concatenating tables

2016-01-23 Thread Deenar Toraskar
On 23 Jan 2016 9:18 p.m., "Deenar Toraskar" <
deenar.toras...@thinkreactive.co.uk> wrote:

> Df.UnionAll(df2).unionall (df3)
> On 23 Jan 2016 9:02 p.m., "Andrew Holway" 
> wrote:
>
>> Is there a data frame operation to do this?
>>
>> +-+
>> | A B C D |
>> +-+
>> | 1 2 3 4 |
>> | 5 6 7 8 |
>> +-+
>> +-+
>> | A B C D |
>> +-+
>> | 3 5 6 8 |
>> | 0 0 0 0 |
>> +-+
>> +-+
>> | A B C D |
>> +-+
>> | 8 8 8 8 |
>> | 1 1 1 1 |
>> +-+
>>
>> Concatenated together to make this.
>>
>> +-+
>> | A B C D |
>> +-+
>> | 1 2 3 4 |
>> | 5 6 7 8 |
>> | 3 5 6 8 |
>> | 0 0 0 0 |
>> | 8 8 8 8 |
>> | 1 1 1 1 |
>> +-+
>>
>> Thanks,
>>
>> Andrew
>>
>


Spark RDD DAG behaviour understanding in case of checkpointing

2016-01-23 Thread gaurav sharma
Hi Tathagata/Cody,

I am facing a challenge in Production with DAG behaviour during
checkpointing in spark streaming -

Step 1 : Read data from Kafka every 15 min -  call this KafkaStreamRDD ~
100 GB of data

Step 2 : Repartition KafkaStreamRdd from 5 to 100 partitions to parallelise
processing - call this RepartitionedKafkaStreamRdd

Step 3 : on this RepartitionedKafkaStreamRdd I run map and
reduceByKeyAndWindow over a window of 2 hours, call this RDD1 ~ 100 MB of
data

Checkpointing is enabled.

If i restart my streaming context, it picks up from last checkpointed
state,

READS data for all the 8 SUCCESSFULLY FINISHED 15 minute batches from Kafka
, re-performs Repartition of all the data of all these 8 , 15 minute
batches.

Then reads data for current 15 minute batch and runs map and
reduceByKeyAndWindow over a window of 2 hours.

Challenge -
1> I cant checkpoint KafkaStreamRDD or RepartitionedKafkaStreamRdd as this
is huge data around 800GB for 2 hours, reading and writing (checkpointing)
this at every 15 minutes would be very slow.

2> When i have checkpointed data of RDD1 at every 15 minutes, and map and
reduceByKeyAndWindow is being run over RDD1 only, and i have snapshot of
all of the last 8, 15 minute batches of RDD1,
why is spark reading all the data for last 8 successfully completed batches
from Kafka again(Step 1) and again performing re-partitioning(Step 2) and
then again running map and reduceByKeyandWindow over these newly fetched
kafkaStreamRdd data of last 8 , 15 minute batches.

Because of above mentioned challenges, i am not able to exploit
checkpointing, in case streaming context is restarted at high load.

Please help out in understanding, if there is something that i am missing

Regards,
Gaurav


Debug what is replication Level of which RDD

2016-01-23 Thread gaurav sharma
Hi All,

I have enabled replication for my RDDs.

I see on the Storage tab of the Spark UI, which mentions the replication
level 2x or 1x.

But the names given are mappedRDD, shuffledRDD, I am not able to debug
which of my RDD is 2n replicated, and which one is 1x.

Please help.

Regards,
Gaurab


Re: Spark Dataset doesn't have api for changing columns

2016-01-23 Thread Milad khajavi
How can I request for this API?
See this closed issue: https://issues.apache.org/jira/browse/SPARK-12863

On Tue, Jan 19, 2016 at 10:12 PM, Michael Armbrust 
wrote:

> In Spark 2.0 we are planning to combine DataFrame and Dataset so that all
> the methods will be available on either class.
>
> On Tue, Jan 19, 2016 at 3:42 AM, Milad khajavi  wrote:
>
>> Hi Spark users,
>>
>> when I want to map the result of count on groupBy, I need to convert the
>> result to Dataframe, then change the column names and map the result to new
>> case class, Why Spark Datatset API doesn't have direct functionality?
>>
>> case class LogRow(id: String, location: String, time: Long)
>> case class KeyValue(key: (String, String), value: Long)
>>
>> val log = LogRow("1", "a", 1) :: LogRow("1", "a", 2) :: LogRow("1", "b",
>> 3) :: LogRow("1", "a", 4) :: LogRow("1", "b", 5) :: LogRow("1", "b", 6) ::
>> LogRow("1", "c", 7) :: LogRow("2", "a", 1) :: LogRow("2", "b", 2) ::
>> LogRow("2", "b", 3) :: LogRow("2", "a", 4) :: LogRow("2", "a", 5) ::
>> LogRow("2", "a", 6) :: LogRow("2", "c", 7) :: Nil
>> log.toDS().groupBy(l => {
>>   (l.id, l.location)
>> }).count().toDF().toDF("key", "value").as[KeyValue].show
>>
>> +-+-+
>> |  key|value|
>> +-+-+
>> |[1,a]|3|
>> |[1,b]|3|
>> |[1,c]|1|
>> |[2,a]|4|
>> |[2,b]|2|
>> |[2,c]|1|
>> +-+-+
>>
>>
>> --
>> Milād Khājavi
>> http://blog.khajavi.ir
>> Having the source means you can do it yourself.
>> I tried to change the world, but I couldn’t find the source code.
>>
>
>


-- 
Milād Khājavi
http://blog.khajavi.ir
Having the source means you can do it yourself.
I tried to change the world, but I couldn’t find the source code.