Parse RDD[Seq[String]] to DataFrame with types.

2019-07-17 Thread Guillermo Ortiz Fernández
I'm trying to parse a RDD[Seq[String]] to Dataframe.
ALthough it's a Seq of Strings they could have a more specific type as Int,
Boolean, Double, String an so on.
For example, a line could be:
"hello", "1", "bye", "1.1"
"hello1", "11", "bye1", "2.1"
...

First column is going to be always a String, second an int and so on and
it's going to be always on this way. On the other hand, one execution could
have  seq of five elements and others the sequences could have 2000, so it
depends of the execution but in each execution I know the types of each
"column" or "elem" of the sequence.

To do it, I could have something like this:
//I could have a parameter to generate the StructType dinamically.
def getSchema(): StructType = {
  var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]()
  schemaArray += StructField("col1" , IntegerType, true)
  schemaArray += StructField("col2" , StringType, true)
  schemaArray += StructField("col2" , DoubleType, true)
  StructType(schemaArray)
}

//Array of Any?? it doesn't seem the best option!!
val l1: Seq[Any] = Seq(1,"2", 1.1 )
val rdd1 = sc.parallelize(Lz).map(Row.fromSeq(_))

val schema = getSchema()
val df = sqlContext.createDataFrame(rdd1, schema)
df.show()
df.schema

I don't like at all to have a Seq of Any, but it's really what I have.
Another chance??

On the other hand I was thinking that I have something similar to a CSV, I
could create one. With spark there is a library to read an CSV and return a
dataframe where types are infered. Is it possible to call it if I have
already an RDD[String]?


Re: Putting record in HBase with Spark - error get regions.

2019-05-28 Thread Guillermo Ortiz Fernández
After a while it's possible to see this error too:

9/05/28 11:11:18 ERROR executor.Executor: Exception in task 35.1 in
stage 0.0 (TID 265)
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:
Failed 122 actions: my_table: 122 times,
at 
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:258)
at 
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2000(AsyncProcess.java:238)
at 
org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1810)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:240)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092)
at 
example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25)
at 
example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/05/28 11:11:18 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 369


El mar., 28 may. 2019 a las 12:12, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> I'm executing a load process into HBase with spark. (around 150M record).
> At the end of the process there are a lot of fail tasks.
>
> I get this error:
>
> 19/05/28 11:02:31 ERROR client.AsyncProcess: Failed to get region location
> org.apache.hadoop.hbase.TableNotFoundException: my_table
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1417)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1211)
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:410)
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:359)
>   at 
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:238)
>   at 
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146)
>   at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092)
>   at 
> example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25)
>   at 
> example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> When I execute from the hbase shell an scan, it works. Which could it be the 
> reason? I'm not sure if it's more a error from HBase or Spark.
>
>


Putting record in HBase with Spark - error get regions.

2019-05-28 Thread Guillermo Ortiz Fernández
I'm executing a load process into HBase with spark. (around 150M record).
At the end of the process there are a lot of fail tasks.

I get this error:

19/05/28 11:02:31 ERROR client.AsyncProcess: Failed to get region location
org.apache.hadoop.hbase.TableNotFoundException: my_table
at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1417)
at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1211)
at 
org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:410)
at 
org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:359)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:238)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092)
at 
example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25)
at 
example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


When I execute from the hbase shell an scan, it works. Which could it
be the reason? I'm not sure if it's more a error from HBase or Spark.


Re: Spark Streaming - Proeblem to manage offset Kafka and starts from the beginning.

2019-02-27 Thread Guillermo Ortiz
I'm going to check the value, but I didn't change it., normally, the
process is always running but sometimes I have to restarted to apply some
changes. Sometimes it starts from the beginning and others continue for the
last offset.

El mié., 27 feb. 2019 a las 12:25, Akshay Bhardwaj (<
akshay.bhardwaj1...@gmail.com>) escribió:

> Hi Gabor,
>
> I am talking about offset.retention.minutes which is set default as 1440
> (or 24 hours)
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Wed, Feb 27, 2019 at 4:47 PM Gabor Somogyi 
> wrote:
>
>> Hi Akshay,
>>
>> The feature what you've mentioned has a default value of 7 days...
>>
>> BR,
>> G
>>
>>
>> On Wed, Feb 27, 2019 at 7:38 AM Akshay Bhardwaj <
>> akshay.bhardwaj1...@gmail.com> wrote:
>>
>>> Hi Guillermo,
>>>
>>> What was the interval in between restarting the spark job? As a feature
>>> in Kafka, a broker deleted offsets for a consumer group after inactivity of
>>> 24 hours.
>>> In such a case, the newly started spark streaming job will read offsets
>>> from beginning for the same groupId.
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Thu, Feb 21, 2019 at 9:08 PM Gabor Somogyi 
>>> wrote:
>>>
>>>> From the info you've provided not much to say.
>>>> Maybe you could collect sample app, logs etc, open a jira and we can
>>>> take a deeper look at it...
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Thu, Feb 21, 2019 at 4:14 PM Guillermo Ortiz 
>>>> wrote:
>>>>
>>>>> I' working with Spark Streaming 2.0.2 and Kafka 1.0.0 using Direct
>>>>> Stream as connector. I consume data from Kafka and autosave the offsets.
>>>>> I can see Spark doing commits in the logs of the last offsets
>>>>> processed, Sometimes I have restarted spark and it starts from the
>>>>> beginning, when I'm using the same groupId.
>>>>>
>>>>> Why could it happen? it only happen rarely.
>>>>>
>>>>


Spark Streaming - Proeblem to manage offset Kafka and starts from the beginning.

2019-02-21 Thread Guillermo Ortiz
I' working with Spark Streaming 2.0.2 and Kafka 1.0.0 using Direct Stream
as connector. I consume data from Kafka and autosave the offsets.
I can see Spark doing commits in the logs of the last offsets processed,
Sometimes I have restarted spark and it starts from the beginning, when I'm
using the same groupId.

Why could it happen? it only happen rarely.


DAGScheduler in SparkStreaming

2018-09-14 Thread Guillermo Ortiz
A question, if you use Spark Streaming, the DAG is calculated for each
microbatch? it's possible to calculate only the first time?


Trying to improve performance of the driver.

2018-09-13 Thread Guillermo Ortiz Fernández
I have a process in Spark Streamin which lasts 2 seconds. When I check
where the time is spent I see about 0.8s-1s in processing time although the
global time is 2s. This one second is spent in the driver.
I reviewed the code which is executed by the driver and I commented some of
this code with the same result. So I don't have any idea where the time is
spent.

Righ now, I'm executing in client mode from one the node inside the cluster
so I can't set the number the cores to the driver (although I don't think
that it's going to make the difference) .

How could I know where the driver is spending the time? I'm not sure if it
possible to improve the performance in this point or that second is spent
scheduling the graph of each microbatch mainly


Re: deploy-mode cluster. FileNotFoundException

2018-09-05 Thread Guillermo Ortiz Fernández
I'm using standalone cluster and the final command I'm trying is:
spark-submit --verbose --deploy-mode cluster --driver-java-options
"-Dlogback.configurationFile=conf/i${1}Logback.xml" \
--class com.example.Launcher --driver-class-path
lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-0.10.0.1.jar
\
--files conf/${1}Conf.json iris-core-0.0.1-SNAPSHOT.jar conf/${1}Conf.json

El mié., 5 sept. 2018 a las 11:11, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> I want to execute my processes in cluster mode. As I don't know where the
> driver has been executed I have to do available all the file it needs. I
> undertand that they are two options. Copy all the files to all nodes of
> copy them to HDFS.
>
> My doubt is,, if I want to put all the files in HDFS, isn't it automatic
> with --files and --jar parameters in the spark-submit command? or do I have
> to copy to HDFS manually?
>
> My idea is to execute something like:
> spark-submit --driver-java-options
> "-Dlogback.configurationFile=conf/${1}Logback.xml" \
> --class com.example.Launcher --driver-class-path
> lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-1.0.0.jar \
> --files /conf/${1}Conf.json example-0.0.1-SNAPSHOT.jar conf/${1}Conf.json
> I have tried to with --files hdfs:// without copying anything to hdfs
> and it doesn't work either.
>
>


deploy-mode cluster. FileNotFoundException

2018-09-05 Thread Guillermo Ortiz Fernández
I want to execute my processes in cluster mode. As I don't know where the
driver has been executed I have to do available all the file it needs. I
undertand that they are two options. Copy all the files to all nodes of
copy them to HDFS.

My doubt is,, if I want to put all the files in HDFS, isn't it automatic
with --files and --jar parameters in the spark-submit command? or do I have
to copy to HDFS manually?

My idea is to execute something like:
spark-submit --driver-java-options
"-Dlogback.configurationFile=conf/${1}Logback.xml" \
--class com.example.Launcher --driver-class-path
lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-1.0.0.jar \
--files /conf/${1}Conf.json example-0.0.1-SNAPSHOT.jar conf/${1}Conf.json
I have tried to with --files hdfs:// without copying anything to hdfs
and it doesn't work either.


Local mode vs client mode with one executor

2018-08-30 Thread Guillermo Ortiz
I have many spark processes, some of them are pretty simple and they don't
have to process almost messages but they were developed with the same
archeotype and they use spark.

Some of them are executed with many executors but a few ones don't make
sense to process with more than 2-4 cores in only one executor. The most
important reason is that the quantity of messages is so low,, that it's not
worth it.

The point here it's,, any disventage if I run this few spark processes in
local[2..4] instance of cluster/client mode with one executor (4 cores) and
one driver?. I have read that it's a testing mode in most of cases and I
use for my tests too ;)
Besides, it seems that it goes faster running in local mode in those cases.


Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I can't... do you think that it's a possible bug of this version?? from
Spark or Kafka?

El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
escribió:

> Are you able to try a recent version of spark?
>
> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>  wrote:
> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> > exception and Spark dies.
> >
> > I couldn't see any error or problem among the machines, anybody has the
> > reason about this error?
> >
> >
> > java.lang.IllegalStateException: This consumer has already been closed.
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.Option.orElse(Option.scala:289)
> ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > ~[scala-library-2.11.11.jar:na]
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> > ~[scala-library-2.11.11.jar:na]
> > at
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.

java.lang.OutOfMemoryError: Java heap space - Spark driver.

2018-08-29 Thread Guillermo Ortiz Fernández
I got this error from spark driver, it seems that I should increase the
memory in the driver although it's 5g (and 4 cores) right now. It seems
weird to me because I'm not using Kryo or broadcast in this process but in
the log there are references to Kryo and broadcast.
How could I figure out the reason of this outOfMemory? Is it normal that
there are references to Kryo and broadcasting when I'm not using it?

05:11:19.110 [streaming-job-executor-0] WARN
c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' <->
com.datastax.driver.dse.search.DateRange] because it collides with
previously registered codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' <->
com.datastax.driver.dse.search.DateRange]
05:11:26.806 [dag-scheduler-event-loop] WARN  org.apache.spark.util.Utils -
Suppressing exception in finally: Java heap space
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
~[na:1.8.0_162]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_162]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
~[lz4-1.3.0.jar:na]
at
net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
~[lz4-1.3.0.jar:na]
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
~[kryo-3.0.3.jar:na]
at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
~[kryo-3.0.3.jar:na]
at
org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:209)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:238)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1012)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:936)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:935)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at scala.collection.immutable.List.foreach(List.scala:392)
[scala-library-2.11.11.jar:na]
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:935)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:873)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
05:40:53.535 [dse-app-client-thread-pool-0] WARN
c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' 

Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
exception and Spark dies.

I couldn't see any error or problem among the machines, anybody has the
reason about this error?


java.lang.IllegalStateException: This consumer has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
~[scala-library-2.11.11.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]


Re: Caching small Rdd's take really long time and Spark seems frozen

2018-08-24 Thread Guillermo Ortiz
Another test I just did it's to execute with local[X] and this problem
doesn't happen.  Communication problems?

2018-08-23 22:43 GMT+02:00 Guillermo Ortiz :

> it's a complex DAG before the point I cache the RDD, they are some joins,
> filter and maps before caching data, but most of the times it doesn't take
> almost time to do it. I could understand if it would take the same time all
> the times to process or cache the data. Besides it seems random and they
> are any weird data in the input.
>
> Another test I tried it's disabled caching, and I saw that all the
> microbatches last the same time, so it seems that it's relation with
> caching these RDD's.
>
> El jue., 23 ago. 2018 a las 15:29, Sonal Goyal ()
> escribió:
>
>> How are these small RDDs created? Could the blockage be in their compute
>> creation instead of their caching?
>>
>> Thanks,
>> Sonal
>> Nube Technologies <http://www.nubetech.co>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>> On Thu, Aug 23, 2018 at 6:38 PM, Guillermo Ortiz 
>> wrote:
>>
>>> I use spark with caching with persist method. I have several RDDs what I
>>> cache but some of them are pretty small (about 300kbytes). Most of time it
>>> works well and usually lasts 1s the whole job, but sometimes it takes about
>>> 40s to store 300kbytes to cache.
>>>
>>> If I go to the SparkUI->Cache, I can see how the percentage is
>>> increasing until 83% (250kbytes) and then it stops for a while. If I check
>>> the event time in the Spark UI I can see that when this happen there is a
>>> node where tasks takes very long time. This node could be any from the
>>> cluster, it's not always the same.
>>>
>>> In the spark executor logs I can see it's that it takes about 40s in
>>> store 3.7kb when this problem occurs
>>>
>>> INFO  2018-08-23 12:46:58 Logging.scala:54 -
>>> org.apache.spark.storage.BlockManager: Found block rdd_1705_23 locally
>>> INFO  2018-08-23 12:47:38 Logging.scala:54 -
>>> org.apache.spark.storage.memory.MemoryStore: Block rdd_1692_7 stored as
>>> bytes in memory (estimated size 3.7 KB, free 1048.0 MB)
>>> INFO  2018-08-23 12:47:38 Logging.scala:54 -
>>> org.apache.spark.storage.BlockManager: Found block rdd_1692_7 locally
>>>
>>> I have tried with MEMORY_ONLY, MEMORY_AND_SER and so on with the same
>>> results. I have checked the IO disk (although if I use memory_only I guess
>>> that it doesn't have sense) and I can't see any problem. This happens
>>> randomly, but it could be in the 25% of the jobs.
>>>
>>> Any idea about what it could be happening?
>>>
>>
>>


Re: Caching small Rdd's take really long time and Spark seems frozen

2018-08-23 Thread Guillermo Ortiz
it's a complex DAG before the point I cache the RDD, they are some joins,
filter and maps before caching data, but most of the times it doesn't take
almost time to do it. I could understand if it would take the same time all
the times to process or cache the data. Besides it seems random and they
are any weird data in the input.

Another test I tried it's disabled caching, and I saw that all the
microbatches last the same time, so it seems that it's relation with
caching these RDD's.

El jue., 23 ago. 2018 a las 15:29, Sonal Goyal ()
escribió:

> How are these small RDDs created? Could the blockage be in their compute
> creation instead of their caching?
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Thu, Aug 23, 2018 at 6:38 PM, Guillermo Ortiz 
> wrote:
>
>> I use spark with caching with persist method. I have several RDDs what I
>> cache but some of them are pretty small (about 300kbytes). Most of time it
>> works well and usually lasts 1s the whole job, but sometimes it takes about
>> 40s to store 300kbytes to cache.
>>
>> If I go to the SparkUI->Cache, I can see how the percentage is increasing
>> until 83% (250kbytes) and then it stops for a while. If I check the event
>> time in the Spark UI I can see that when this happen there is a node where
>> tasks takes very long time. This node could be any from the cluster, it's
>> not always the same.
>>
>> In the spark executor logs I can see it's that it takes about 40s in
>> store 3.7kb when this problem occurs
>>
>> INFO  2018-08-23 12:46:58 Logging.scala:54 -
>> org.apache.spark.storage.BlockManager: Found block rdd_1705_23 locally
>> INFO  2018-08-23 12:47:38 Logging.scala:54 -
>> org.apache.spark.storage.memory.MemoryStore: Block rdd_1692_7 stored as
>> bytes in memory (estimated size 3.7 KB, free 1048.0 MB)
>> INFO  2018-08-23 12:47:38 Logging.scala:54 -
>> org.apache.spark.storage.BlockManager: Found block rdd_1692_7 locally
>>
>> I have tried with MEMORY_ONLY, MEMORY_AND_SER and so on with the same
>> results. I have checked the IO disk (although if I use memory_only I guess
>> that it doesn't have sense) and I can't see any problem. This happens
>> randomly, but it could be in the 25% of the jobs.
>>
>> Any idea about what it could be happening?
>>
>
>


Caching small Rdd's take really long time and Spark seems frozen

2018-08-23 Thread Guillermo Ortiz
I use spark with caching with persist method. I have several RDDs what I
cache but some of them are pretty small (about 300kbytes). Most of time it
works well and usually lasts 1s the whole job, but sometimes it takes about
40s to store 300kbytes to cache.

If I go to the SparkUI->Cache, I can see how the percentage is increasing
until 83% (250kbytes) and then it stops for a while. If I check the event
time in the Spark UI I can see that when this happen there is a node where
tasks takes very long time. This node could be any from the cluster, it's
not always the same.

In the spark executor logs I can see it's that it takes about 40s in store
3.7kb when this problem occurs

INFO  2018-08-23 12:46:58 Logging.scala:54 -
org.apache.spark.storage.BlockManager: Found block rdd_1705_23 locally
INFO  2018-08-23 12:47:38 Logging.scala:54 -
org.apache.spark.storage.memory.MemoryStore: Block rdd_1692_7 stored as
bytes in memory (estimated size 3.7 KB, free 1048.0 MB)
INFO  2018-08-23 12:47:38 Logging.scala:54 -
org.apache.spark.storage.BlockManager: Found block rdd_1692_7 locally

I have tried with MEMORY_ONLY, MEMORY_AND_SER and so on with the same
results. I have checked the IO disk (although if I use memory_only I guess
that it doesn't have sense) and I can't see any problem. This happens
randomly, but it could be in the 25% of the jobs.

Any idea about what it could be happening?


Refresh broadcast variable when it isn't the value.

2018-08-19 Thread Guillermo Ortiz Fernández
Hello,

I want to set data in a broadcast (Map) variable in Spark.
Sometimes there are new data so I have to update/refresh the values but I'm
not sure how I could do this.

My idea is to use accumulators like a flag when a cache error occurs, in
this point I could read the data and reload the broadcast variable for the
next microbach and finally reset the accumulator to 0.
I don't know if there are a better solution or others ideas to do this.

Has anyone faced this problem?


Reset the offsets, Kafka 0.10 and Spark

2018-06-07 Thread Guillermo Ortiz Fernández
I'm consuming data from Kafka with  createDirectStream and store the
offsets in Kafka (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself
)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))



My Spark version is 2.0.2 and 0.10 from Kafka. This solution works well and
when I restart the spark process starts from the last offset which Spark
consumes, but sometimes I need to reprocess all the topic from the
beginning.

I have seen that I could reset the offset with a kafka script but it's not
enable in Kafka 0.10...

kafka-consumer-groups --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute


Another possibility it's to set another kafka parameter in the
createDirectStream with a map with the offsets but, how could I get first
offset from each partition?, I have checked the api from the new consumer
and I don't see any method to get these offsets.

Any other way?? I could start with another groupId as well, but it doesn't
seem a very clean option for production.


Measure performance time in some spark transformations.

2018-05-12 Thread Guillermo Ortiz Fernández
I want to measure how long it takes some different transformations in Spark
as map, joinWithCassandraTable and so on.  Which one is the best
aproximation to do it?

def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
result}


Could I use something like this?? I guess that the System.nanoTime will be
executed in the driver before and after the workers execute the maps/joins
and so on. Is it right? any other idea?


Testing spark streaming action

2018-04-10 Thread Guillermo Ortiz
I have a unitTest in SparkStreaming which has an input parameters.
-DStream[String]

Inside of the code I want to update an LongAccumulator. When I execute the
test I get an NullPointerException because the accumulator doesn't exist.
Is there any way to test this?

My accumulator is updated in different methods.

def execute(stream: DStream[String]): Unit = {
stream.foreachRDD { rdd =>
  rdd.foreach { r =>
if (r == "A"){
  acc.add(1)
  sendKafka(...)
}`enter code here`
}
}

It's possible to test this kind of method?

runAction[String](input, service.execute)

When it try to update the accumulator it doesn't work because it doesn't
have inited. I could add a new parameter to the execute method, and it's
okay, but runAction doesn't admint more parameters either.


Testing spark-testing-base. Error multiple SparkContext

2018-04-03 Thread Guillermo Ortiz
I'm doing a spark test with spark streaming, cassandra and kafka.
I have an action which has an DStream as input and save to Cassandra and
sometimes put some elements in Kafka.
I'm using https://github.com/holdenk/spark-testing-base and kafka y
cassandra in local.


My method looks like:





*def execute(dstream: DStream[MyObject]) : Unit = {//Some proccesing
--> this works//Save to Cassandra some RDDs --> this works//Send to
Kafka some record. --> this doesn't work in test, it works outside of the
test.}*


When I send data to Kafka:


*//There is an error in this method*













*def sendToKafka(rec: DStream[CrmTipoCliente]) = {  rec.foreachRDD( r =>
{r.foreachPartition {  val kafka =
SparkKafkaSink[String,String](Config.kafkapropsProd)  --> Exception here.
Config.kafkapr returns a properties with the values to connect to
Kafka  partition =>partition.foreach {  message =>
{//Some logic..kafka.send("test", null,
"message")  }}} })*

My test looks like:

























*@RunWith(classOf[JUnitRunner])class CassandraConnectionIntegrationTest
extends FunSuite with BeforeAndAfter with BeforeAndAfterAll with
StreamingActionBase{var cluster: Cluster = _implicit var session:
Session = _val keyspace: String = "iris"val table: String =
keyspace + ".tipo_cliente_ref"var service: MyClass = _override def
beforeAll(): Unit = {super.beforeAll()//This line doesn't work!
 sc.getConf.set("spark.driver.allowMultipleContexts", "true")
 ...test("Insert record ") {val inputInsert = MyObject("...")val
input = List(List(inputInsert))runAction[MyObject](input,
service.execute)val result = session.execute("select * from myTable
WHERE...")//Some assert to Cassandra and Kafka}*
This test partial works, it saves data into Cassandra but it doesn't work
when it has to send data to Kafka.

The error I can see:
23:58:45.329 [pool-22-thread-1] INFO  o.a.spark.streaming.CheckpointWriter
- Saving checkpoint for time 1000 ms to file
'file:/C:/Users/A148681/AppData/Local/Temp/spark-cdf3229b-9d84
-400f-b92a-5ff4086b81c3/checkpoint-1000'
Exception in thread "streaming-job-executor-0"
java.lang.ExceptionInInitializerError
at
com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass.scala:49)
at
com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass
.scala:47)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
*Caused by: org.apache.spark.SparkException: Only one SparkContext may be
running in this JVM (see SPARK-2243). To ignore this error, set
spark.driver.allowMultipleContexts = true. Th*
e currently running SparkContext was created at:
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
com.holdenkarau.spark.testing.SharedSparkContext$class.beforeAll(SharedSparkContext.scala:45)


Testing with spark-base-test

2018-03-28 Thread Guillermo Ortiz
I'm using spark-unit-test and I don't get to compile the code.

  test("Testging") {
val inputInsert = A("data2")
val inputDelete = A("data1")
val outputInsert = B(1)
val outputDelete = C(1)

val input = List(List(inputInsert), List(inputDelete))
val output = (List(List(outputInsert)), List(List(outputDelete)))

//Why doesn't it compile?? I have tried many things here.
testOperation[A,(B,C)](input, service.processing _, output)
  }

My method is:

def processing(avroDstream: DStream[A]) : (DStream[B],DStream[C]) ={...}

What does the "_" means in this case?


Connection SparkStreaming with SchemaRegistry

2018-03-09 Thread Guillermo Ortiz
 I'm trying to integrate with schemaRegistry and SparkStreaming. By the
moment I want to use GenericRecords. It seems that my producer works and
new schemas are published in _schemas topic. When I try to read with my
Consumer, I'm not able to deserialize the data.

How could I say to Spark that I'm going to deserializer to GenericRecord?



public class SparkStreamingSchemaRegister {

public static void main(String[] args) throws InterruptedException {
String topic = "avro_example_schemaRegistry";

final JavaStreamingContext jssc = new
JavaStreamingContext(getSparkConf(),

Durations.milliseconds(Constants.STREAM_BATCH_DURATION_MILLIS));


final JavaInputDStream>
rawStream = KafkaSource.getKafkaDirectStream(jssc);

rawStream.foreachRDD(rdd -> {
JavaRDD javaRddClient = rdd.map(
kafkaRecord -> {

GenericRecord record = kafkaRecord.value(); -->
ERROR
return CrmClient.getCrmClient(kafkaRecord.value());
});


   CassandraJavaUtil
.javaFunctions(javaRddClient)
.writerBuilder("keyspace", "client",
CassandraJavaUtil.mapToRow(CrmClient.class))
.withColumnSelector(CassandraJavaUtil.someColumns("id",
"name", "lastname"))
.saveToCassandra();
});


jssc.start();
jssc.awaitTermination();
jssc.close();
}


private static class KafkaSource {
public static JavaInputDStream> getKafkaDirectStream(JavaStreamingContext jssc) {
JavaInputDStream>
stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(getKafkaTopic(),
getKafkaConf()));
return stream;
}


private static Map getKafkaConf() {
Map kafkaParams = new HashMap<>();

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_BOOTSTRAP_SERVERS.getValue(),
Constants.KAFKA_BOOTSTRAP_SERVERS);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_KEY_DESERIALIZER.getValue(),
ByteArrayDeserializer.class);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_GROUPID.getValue(),
Constants.KAFKA_GROUP_ID);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_ENABLE_AUTO_COMMIT.getValue(),
false);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_AUTO_OFFSET_RESET.getValue(),
"earliest");

* 
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class.getName());*

kafkaParams.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
"false");

kafkaParams.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"
http://localhost:8081;);

return kafkaParams;
}

}
}


Spark Streaming reading many topics with Avro

2018-03-02 Thread Guillermo Ortiz
Hello,

I want to read with a single Spark Streaming process several topics. I'm
using avro and the data to the different topics have a different
schema.Ideally, If I would only have one topic I could implement a
deserializer but, I don't know if it's possible with many different schemas.

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean))



I can only set an value.deserializer and even if I could set many of them,
I don't know how the process is going to pick the right one.  Any idea?, I
guess I could use ByteDeserializer and do it for myself too...


Re: Testing Spark-Cassandra

2018-01-17 Thread Guillermo Ortiz
Thanks, I'll check it ;)

2018-01-17 17:19 GMT+01:00 Alonso Isidoro Roman <alons...@gmail.com>:

> Yes, you can use docker to build your own cassandra ring. Depending your
> SO, instructions may change, so, please, follow this
> <https://yurisubach.com/2016/03/24/cassandra-docker-test-cluster/> link
> to install it, and then follow this
> <https://github.com/koeninger/spark-cassandra-example> project, but you
> will have to adapt the necessary libraries to use spark 2.0.x version.
>
> Good luck, i would like to see any blog post using this combination.
>
>
>
> 2018-01-17 16:48 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>:
>
>> Hello,
>>
>> I'm using spark 2.0 and Cassandra. Is there any util to make unit test
>> easily or which one would be the best way to do it? library? Cassandra with
>> docker?
>>
>
>
>
> --
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>


Testing Spark-Cassandra

2018-01-17 Thread Guillermo Ortiz
Hello,

I'm using spark 2.0 and Cassandra. Is there any util to make unit test
easily or which one would be the best way to do it? library? Cassandra with
docker?


Re: Flume and Spark Streaming

2017-01-16 Thread Guillermo Ortiz
Avro sink --> Spark Streaming

2017-01-16 13:55 GMT+01:00 ayan guha <guha.a...@gmail.com>:

> With Flume, what would be your sink?
>
>
>
> On Mon, Jan 16, 2017 at 10:44 PM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> I'm wondering to use Flume (channel file)-Spark Streaming.
>>
>> I have some doubts about it:
>>
>> 1.The RDD size is all data what it comes in a microbatch which you have
>> defined. Risght?
>>
>> 2.If there are 2Gb of data, how many are RDDs generated? just one and I
>> have to make a repartition?
>>
>> 3.When is the ACK sent back  from Spark to Flume?
>>   I guess that if Flume dies, Flume is going to send the same data again
>> to Spark
>>   If Spark dies, I have no idea if Spark is going to reprocessing same
>> data again when it is sent again.
>>   Coult it be different if I use Kafka Channel?
>>
>>
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Flume and Spark Streaming

2017-01-16 Thread Guillermo Ortiz
I'm wondering to use Flume (channel file)-Spark Streaming.

I have some doubts about it:

1.The RDD size is all data what it comes in a microbatch which you have
defined. Risght?

2.If there are 2Gb of data, how many are RDDs generated? just one and I
have to make a repartition?

3.When is the ACK sent back  from Spark to Flume?
  I guess that if Flume dies, Flume is going to send the same data again to
Spark
  If Spark dies, I have no idea if Spark is going to reprocessing same data
again when it is sent again.
  Coult it be different if I use Kafka Channel?


Number of consumers in Kafka with Spark Streaming

2016-06-21 Thread Guillermo Ortiz
I use Spark Streaming with Kafka and I'd like to know how many consumers
are generated. I guess that as many as partitions in Kafka but I'm not
sure.
Is there a way to know the name of the groupId generated in Spark to Kafka?


Re: java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest

2016-05-09 Thread Guillermo Ortiz
The class is in *kafka_2.10-0.8.2.1.jar* not in
spark-streaming-kafka_2.10-1.5.1.jar

I found the error... so embarrasing. I was executing in cluster-mode and I
only had the jars in the gateway. I guess that Spark chose an NodeManager
to execute a container with the Driver and load the libraries, but the
libraries was just in the Gateway.

Anyway, these jars were copied in HDFS by spark-submit with *--jars
$SPARK_CLASSPATH.*
So my question is: Is it really necessary to all jars in all the "local"
directories of the possible Executor nodes?

2016-05-09 14:49 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:

> That sounds like specific for Kafka.  Check this
> https://www.codatlas.com/github.com/apache/kafka/HEAD/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
>
> I cannot see it in
>
>  jar tvf spark-streaming-kafka_2.10-1.5.1.jar|grep TopicMetadataRequest
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 May 2016 at 13:35, Guillermo Ortiz <konstt2...@gmail.com> wrote:
>
>> I was looking the log carefully looking for others error. Anyway the 
>> complete Exception is:
>>
>> 2016-05-09 13:20:25,646 [Driver] ERROR 
>> org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: 
>> java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest
>> java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest
>>  at 
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
>>  at produban.spark.CentralLog$.createContext(CentralLog.scala:34)
>>  at produban.spark.CentralLog$$anonfun$3.apply(CentralLog.scala:85)
>>  at produban.spark.CentralLog$$anonfun$3.apply(CentralLog.scala:84)
>>  at scala.Option.getOrElse(Option.scala:120)
>>  at 
>> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:844)
>>  at produban.spark.CentralLog$.main(CentralLog.scala:84)
>>  at produban.spark.CentralLog.main(CentralLog.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:606)
>>  at 
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)*Caused
>>  by: java.lang.ClassNotFoundException: kafka.api.TopicMetadataRequest*
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>
>>
>> 2016-05-09 13:12 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:
>>
>>> NoClassDefFoundError is different than saying that it could not be loaded 
>>> from the classpath.
>>>
>>> From my experience, there should be some other error before this error 
>>> which would give you better idea.
>>>
>>> You can also check whether another version of kafka is embedded in any of 
>>> the jars listed below.
>>>
>>> Cheers
>>>
>>>
>>> On Mon, May 9, 2016 at 4:00 AM, Guillermo Ortiz <konstt2...@gmail.com>
>>> wrote:
>>>
>>>> *jar tvf kafka_2.10-0.8.2.1.jar | grep TopicMetadataRequest *
>>>>   1757 Thu Feb 26 14:30:34 CET 2015
>>>> kafka/api/TopicMetadataRequest$$anonfun$1.class
>>>>   1712 Thu Feb 26 14:30:34 CET 2015
>>>> kafka/api/TopicMetadataRequest$$anonfun$readFrom$1.class
>>>>   1437 Thu Feb 26 14:30:34 CET 2015
>>>> kafka/api/TopicMetadataRequest$$anonfun$sizeInBytes$1.class
>>>>   1435 Thu Feb 26 14:30:34 CET 2015
>>>> kafka/api/TopicMetadataRequest$$anonfun$writeTo$1.class
>>>>   8028 Thu Feb 26 14:30:34 CET 2015
>>>> kafka/api/TopicMetadataRequest$.class
>>>>  10377 Thu Feb 26 14:30:34 CET 2015 kafka/api/TopicMetadataRequest.class
>>>>   5282 Thu Feb 26 14:30:36 CET 2015
>>>> kafka/javaapi/TopicMetadataRequest.class
>>>>   2135 Thu Feb 26 14:30:38 CET 2015
>>>> kafka/server/

Re: java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest

2016-05-09 Thread Guillermo Ortiz
I was looking the log carefully looking for others error. Anyway the
complete Exception is:

2016-05-09 13:20:25,646 [Driver] ERROR
org.apache.spark.deploy.yarn.ApplicationMaster - User class threw
exception: java.lang.NoClassDefFoundError:
kafka/api/TopicMetadataRequest
java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest
at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
at produban.spark.CentralLog$.createContext(CentralLog.scala:34)
at produban.spark.CentralLog$$anonfun$3.apply(CentralLog.scala:85)
at produban.spark.CentralLog$$anonfun$3.apply(CentralLog.scala:84)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:844)
at produban.spark.CentralLog$.main(CentralLog.scala:84)
at produban.spark.CentralLog.main(CentralLog.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)*Caused
by: java.lang.ClassNotFoundException: kafka.api.TopicMetadataRequest*
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)


2016-05-09 13:12 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:

> NoClassDefFoundError is different than saying that it could not be loaded 
> from the classpath.
>
> From my experience, there should be some other error before this error which 
> would give you better idea.
>
> You can also check whether another version of kafka is embedded in any of the 
> jars listed below.
>
> Cheers
>
>
> On Mon, May 9, 2016 at 4:00 AM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> *jar tvf kafka_2.10-0.8.2.1.jar | grep TopicMetadataRequest *
>>   1757 Thu Feb 26 14:30:34 CET 2015
>> kafka/api/TopicMetadataRequest$$anonfun$1.class
>>   1712 Thu Feb 26 14:30:34 CET 2015
>> kafka/api/TopicMetadataRequest$$anonfun$readFrom$1.class
>>   1437 Thu Feb 26 14:30:34 CET 2015
>> kafka/api/TopicMetadataRequest$$anonfun$sizeInBytes$1.class
>>   1435 Thu Feb 26 14:30:34 CET 2015
>> kafka/api/TopicMetadataRequest$$anonfun$writeTo$1.class
>>   8028 Thu Feb 26 14:30:34 CET 2015 kafka/api/TopicMetadataRequest$.class
>>  10377 Thu Feb 26 14:30:34 CET 2015 kafka/api/TopicMetadataRequest.class
>>   5282 Thu Feb 26 14:30:36 CET 2015
>> kafka/javaapi/TopicMetadataRequest.class
>>   2135 Thu Feb 26 14:30:38 CET 2015
>> kafka/server/KafkaApis$$anonfun$handleTopicMetadataRequest$1.class
>>
>>
>> 2016-05-09 12:51 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
>>
>>> I'm trying to execute a job with Spark and Kafka and I'm getting this
>>> error.
>>>
>>> I know that it's becuase the version are not right, but I have been
>>> checking the jar which I import on the SparkUI spark.yarn.secondary.jars
>>> and they are right and the class exists inside
>>> *kafka_2.10-0.8.2.1.jar. *
>>>
>>> 2016-05-05 15:51:53,617 [Driver] ERROR 
>>> org.apache.spark.deploy.yarn.ApplicationMaster - User class threw 
>>> exception: java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest
>>> java.lang.NoClassDefFoundError: *kafka/api/TopicMetadataRequest*
>>> at 
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
>>> at 
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>> at 
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
>>> 
>>>
>>>
>>> spark-submit  --conf spark.metrics.conf=metrics.properties --name
>>> "CentralLog" --master yarn-cluster --class spark.CentralLog --files
>>> /opt/centralLogs/conf/log4j.properties --jars
>>> /opt/centralLogs/lib/kafka_2.10-0.8.2.1.jar,$SPARK_CLASSPATH
>>> --executor-memory 1024m --num-executors 4 --executor-cores 2
>>> --driver-memory 1024m --files /opt/centralLogs/conf/metrics.properties
>>> /opt/centralLogs/libProject/paas-1.0-SNAPSHOT.jar
>>>
>>> Why doesn't Spark find the class?
>>>
>>&g

Re: java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest

2016-05-09 Thread Guillermo Ortiz
*jar tvf kafka_2.10-0.8.2.1.jar | grep TopicMetadataRequest *
  1757 Thu Feb 26 14:30:34 CET 2015
kafka/api/TopicMetadataRequest$$anonfun$1.class
  1712 Thu Feb 26 14:30:34 CET 2015
kafka/api/TopicMetadataRequest$$anonfun$readFrom$1.class
  1437 Thu Feb 26 14:30:34 CET 2015
kafka/api/TopicMetadataRequest$$anonfun$sizeInBytes$1.class
  1435 Thu Feb 26 14:30:34 CET 2015
kafka/api/TopicMetadataRequest$$anonfun$writeTo$1.class
  8028 Thu Feb 26 14:30:34 CET 2015 kafka/api/TopicMetadataRequest$.class
 10377 Thu Feb 26 14:30:34 CET 2015 kafka/api/TopicMetadataRequest.class
  5282 Thu Feb 26 14:30:36 CET 2015 kafka/javaapi/TopicMetadataRequest.class
  2135 Thu Feb 26 14:30:38 CET 2015
kafka/server/KafkaApis$$anonfun$handleTopicMetadataRequest$1.class


2016-05-09 12:51 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:

> I'm trying to execute a job with Spark and Kafka and I'm getting this
> error.
>
> I know that it's becuase the version are not right, but I have been
> checking the jar which I import on the SparkUI spark.yarn.secondary.jars
> and they are right and the class exists inside *kafka_2.10-0.8.2.1.jar. *
>
> 2016-05-05 15:51:53,617 [Driver] ERROR 
> org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: 
> java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest
> java.lang.NoClassDefFoundError: *kafka/api/TopicMetadataRequest*
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
>   
>
>
> spark-submit  --conf spark.metrics.conf=metrics.properties --name
> "CentralLog" --master yarn-cluster --class spark.CentralLog --files
> /opt/centralLogs/conf/log4j.properties --jars
> /opt/centralLogs/lib/kafka_2.10-0.8.2.1.jar,$SPARK_CLASSPATH
> --executor-memory 1024m --num-executors 4 --executor-cores 2
> --driver-memory 1024m --files /opt/centralLogs/conf/metrics.properties
> /opt/centralLogs/libProject/paas-1.0-SNAPSHOT.jar
>
> Why doesn't Spark find the class?
>
>
> activation-1.1.jar,akka-actor_2.10-2.3.11.jar,akka-remote_2.10-2.3.11.jar,akka-slf4j_2.10-2.3.11.jar,ant-1.6.5.jar,ant-1.9.1.jar,ant-launcher-1.9.1.jar,antlr-runtime-3.5.jar,aopalliance-1.0.jar,apache-log4j-extras-1.2.17.jar,arpack_combined_all-0.1.jar,asm-3.1.jar,asm-4.1.jar,asm-commons-3.1.jar,asm-commons-4.1.jar,asm-tree-3.1.jar,automaton-1.11-8.jar,avro-1.7.4.jar,avro-ipc-1.7.7.jar,avro-ipc-1.7.7-tests.jar,avro-mapred-1.7.7-hadoop2.jar,bonecp-0.8.0.RELEASE.jar,breeze_2.10-0.11.2.jar,breeze-macros_2.10-0.11.2.jar,calcite-avatica-1.2.0-incubating.jar,calcite-core-1.2.0-incubating.jar,calcite-linq4j-1.2.0-incubating.jar,carbonite-1.4.0.jar,cascading-core-2.6.3.jar,cascading-hadoop-2.6.3.jar,cascading-local-2.6.3.jar,chill_2.10-0.5.0.jar,chill-java-0.5.0.jar,clj-stacktrace-0.2.2.jar,clj-time-0.4.1.jar,clojure-1.5.1.jar,clout-1.0.1.jar,commons-cli-1.2.jar,commons-codec-1.9.jar,commons-collections-3.2.1.jar,commons-compiler-2.7.6.jar,commons-compress-1.4.1.jar,commons-configuration-1.6.jar,commons-dbcp-1.4.jar,commons-el-1.0.jar,commons-exec-1.1.jar,commons-fileupload-1.2.1.jar,commons-httpclient-3.1.jar,commons-io-2.3.jar,commons-lang-2.6.jar,commons-lang3-3.3.2.jar,commons-logging-1.1.3.jar,commons-math3-3.4.1.jar,commons-net-2.2.jar,commons-pool-1.5.4.jar,compojure-1.1.3.jar,compress-lzf-1.0.3.jar,concurrentlinkedhashmap-lru-1.2.jar,config-1.2.1.jar,core-1.1.2.jar,core-3.1.1.jar,core.incubator-0.1.0.jar,curator-client-2.6.0.jar,curator-framework-2.6.0.jar,curator-recipes-2.4.0.jar,datanucleus-api-jdo-3.2.6.jar,datanucleus-core-3.2.10.jar,datanucleus-rdbms-3.2.9.jar,derby-10.10.2.0.jar,disruptor-2.10.4.jar,dom4j-1.6.1.jar,eigenbase-properties-1.1.5.jar,elasticsearch-hadoop-2.2.1.jar,geronimo-annotation_1.0_spec-1.1.1.jar,geronimo-jaspic_1.0_spec-1.0.jar,geronimo-jta_1.1_spec-1.1.1.jar,groovy-all-2.1.6.jar,gson-2.2.4.jar,guava-14.0.1.jar,guice-3.0.jar,guice-servlet-3.0.jar,hadoop-annotations-2.2.0.jar,hadoop-auth-2.2.0.jar,hadoop-client-2.6.0.jar,hadoop-common-2.6.0.jar,hadoop-hdfs-2.2.0.jar,hadoop-mapreduce-client-app-2.6.0.jar,hadoop-mapreduce-client-common-2.6.0.jar,hadoop-mapreduce-client-core-2.6.0.jar,hadoop-mapreduce-client-jobclient-2.6.0.jar,hadoop-mapreduce-client-shuffle-2.6.0.jar,hadoop-yarn-api-2.6.0.jar,hadoop-yarn-client-2.6.0.jar,hadoop-yarn-common-2.6.0.jar,hadoop-yarn-server-applicationhistoryservice-2.6.0.jar,hadoop-yarn-server-common-2.6.0.jar,hadoop-yarn-server-resourcemanager-2.6.0.jar,hadoop-yarn-server-web-proxy-2.6.0.jar,hiccup-0.3.6.jar,hive-ant-1.2.1.jar,hive-common-1.2.1.jar,hive-exec-1.2.1.jar,hive-exec-1.2.1.spark.jar,hive-metastore-1.2.1.jar,hive-metastore-1.2.1.spark.jar,hive-serde-1.2.1.jar,hive-service-1.2.1.jar,hive-shims

Re: Error Kafka/Spark. Ran out of messages before reaching ending offset

2016-05-09 Thread Guillermo Ortiz
I reinstalled Kafka and it works, I work with virtual machines and someone
changed the host of one of the Kafkas without telling anybody.



2016-05-06 16:11 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> Yeah, so that means the driver talked to kafka and kafka told it the
> highest available offset was 2723431.  Then when the executor tried to
> consume messages, it stopped getting messages before reaching that
> offset.  That almost certainly means something's wrong with Kafka,
> have you looked at your kafka logs?  I doubt it's anything to do with
> elasticsearch.
>
> On Fri, May 6, 2016 at 4:22 AM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
> > This is the complete error.
> >
> > 2016-05-06 11:18:05,424 [task-result-getter-0] INFO
> > org.apache.spark.scheduler.TaskSetManager - Finished task 5.0 in stage
> > 13.0 (TID 60) in 11692 ms on xx (6/8)
> > 2016-05-06 11:18:08,978 [task-result-getter-1] WARN
> > org.apache.spark.scheduler.TaskSetManager - Lost task 7.0 in stage
> > 13.0 (TID 62, xxx): java.lang.AssertionError: assertion
> > failed: Ran out of messages before reaching ending offset 2723431 for
> > topic kafka-global-paas partition 2 start 2705506. This should not
> > happen, and indicates that messages may have been lost
> > at scala.Predef$.assert(Predef.scala:179)
> > at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
> > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> > at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
> > at
> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
> > at
> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> > at org.apache.spark.scheduler.Task.run(Task.scala:88)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > 2016-05-06 11:18:08,982 [sparkDriver-akka.actor.default-dispatcher-18]
> > INFO  org.apache.spark.scheduler.TaskSetManager - Starting task 7.1 in
> > stage 13.0 (TID 63, , partition 7,RACK_LOCAL, 2052 bytes)
> > 2016-05-06 11:18:10,013 [JobGenerator] INFO
> > org.apache.spark.streaming.scheduler.JobScheduler - Added jobs for
> > time 146252629 ms
> > 2016-05-06 11:18:10,015 [JobGenerator] INFO
> > org.apache.spark.streaming.scheduler.JobGenerator - Checkpointing
> > graph for time 146252629 ms
> >
> > 2016-05-06 11:11 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
> >> I think that it's a kafka error, but I'm starting thinking if it could
> >> be something about elasticsearch since I have seen more people with
> >> same error using elasticsearch. I have no idea.
> >>
> >> 2016-05-06 11:05 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
> >>> I'm trying to read data from Spark and index to ES with its library
> >>> (es-hadoop 2.2.1 version).
> >>> IIt was working right for a while but now it has started to happen
> this error.
> >>> I have delete the checkpoint and even the kafka topic and restart all
> >>> the machines with kafka and zookeeper but it didn't fix it.
> >>>
> >>> User class threw exception: org.apache.spark.SparkException: Job
> >>> aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most
> >>> recent failure: Lost task 4.3 in stage 1.0 (TID 12, x):
> >>> java.lang.AssertionError: assertion failed: Ran out of messages before
> >>> reaching ending offset 1226116 for topic kafka-global-paas partition 7
> >>> start 1212156. This should not happen, and indicates that messages may
> >>> have been lost
> >>> at scala.Predef$.assert(Predef.scala:179)
> >>> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
> >>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>
> >>> I read some threads with this error but it didn't help me.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


java.lang.NoClassDefFoundError: kafka/api/TopicMetadataRequest

2016-05-09 Thread Guillermo Ortiz
I'm trying to execute a job with Spark and Kafka and I'm getting this error.

I know that it's becuase the version are not right, but I have been
checking the jar which I import on the SparkUI spark.yarn.secondary.jars
and they are right and the class exists inside *kafka_2.10-0.8.2.1.jar. *

2016-05-05 15:51:53,617 [Driver] ERROR
org.apache.spark.deploy.yarn.ApplicationMaster - User class threw
exception: java.lang.NoClassDefFoundError:
kafka/api/TopicMetadataRequest
java.lang.NoClassDefFoundError: *kafka/api/TopicMetadataRequest*
at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)



spark-submit  --conf spark.metrics.conf=metrics.properties --name
"CentralLog" --master yarn-cluster --class spark.CentralLog --files
/opt/centralLogs/conf/log4j.properties --jars
/opt/centralLogs/lib/kafka_2.10-0.8.2.1.jar,$SPARK_CLASSPATH
--executor-memory 1024m --num-executors 4 --executor-cores 2
--driver-memory 1024m --files /opt/centralLogs/conf/metrics.properties
/opt/centralLogs/libProject/paas-1.0-SNAPSHOT.jar

Why doesn't Spark find the class?


Re: Error Kafka/Spark. Ran out of messages before reaching ending offset

2016-05-06 Thread Guillermo Ortiz
This is the complete error.

2016-05-06 11:18:05,424 [task-result-getter-0] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 5.0 in stage
13.0 (TID 60) in 11692 ms on xx (6/8)
2016-05-06 11:18:08,978 [task-result-getter-1] WARN
org.apache.spark.scheduler.TaskSetManager - Lost task 7.0 in stage
13.0 (TID 62, xxx): java.lang.AssertionError: assertion
failed: Ran out of messages before reaching ending offset 2723431 for
topic kafka-global-paas partition 2 start 2705506. This should not
happen, and indicates that messages may have been lost
at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
at 
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at 
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

2016-05-06 11:18:08,982 [sparkDriver-akka.actor.default-dispatcher-18]
INFO  org.apache.spark.scheduler.TaskSetManager - Starting task 7.1 in
stage 13.0 (TID 63, , partition 7,RACK_LOCAL, 2052 bytes)
2016-05-06 11:18:10,013 [JobGenerator] INFO
org.apache.spark.streaming.scheduler.JobScheduler - Added jobs for
time 146252629 ms
2016-05-06 11:18:10,015 [JobGenerator] INFO
org.apache.spark.streaming.scheduler.JobGenerator - Checkpointing
graph for time 146252629 ms

2016-05-06 11:11 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
> I think that it's a kafka error, but I'm starting thinking if it could
> be something about elasticsearch since I have seen more people with
> same error using elasticsearch. I have no idea.
>
> 2016-05-06 11:05 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
>> I'm trying to read data from Spark and index to ES with its library
>> (es-hadoop 2.2.1 version).
>> IIt was working right for a while but now it has started to happen this 
>> error.
>> I have delete the checkpoint and even the kafka topic and restart all
>> the machines with kafka and zookeeper but it didn't fix it.
>>
>> User class threw exception: org.apache.spark.SparkException: Job
>> aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most
>> recent failure: Lost task 4.3 in stage 1.0 (TID 12, x):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 1226116 for topic kafka-global-paas partition 7
>> start 1212156. This should not happen, and indicates that messages may
>> have been lost
>> at scala.Predef$.assert(Predef.scala:179)
>> at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>
>> I read some threads with this error but it didn't help me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error Kafka/Spark. Ran out of messages before reaching ending offset

2016-05-06 Thread Guillermo Ortiz
I think that it's a kafka error, but I'm starting thinking if it could
be something about elasticsearch since I have seen more people with
same error using elasticsearch. I have no idea.

2016-05-06 11:05 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
> I'm trying to read data from Spark and index to ES with its library
> (es-hadoop 2.2.1 version).
> IIt was working right for a while but now it has started to happen this error.
> I have delete the checkpoint and even the kafka topic and restart all
> the machines with kafka and zookeeper but it didn't fix it.
>
> User class threw exception: org.apache.spark.SparkException: Job
> aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most
> recent failure: Lost task 4.3 in stage 1.0 (TID 12, x):
> java.lang.AssertionError: assertion failed: Ran out of messages before
> reaching ending offset 1226116 for topic kafka-global-paas partition 7
> start 1212156. This should not happen, and indicates that messages may
> have been lost
> at scala.Predef$.assert(Predef.scala:179)
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> I read some threads with this error but it didn't help me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error Kafka/Spark. Ran out of messages before reaching ending offset

2016-05-06 Thread Guillermo Ortiz
I'm trying to read data from Spark and index to ES with its library
(es-hadoop 2.2.1 version).
IIt was working right for a while but now it has started to happen this error.
I have delete the checkpoint and even the kafka topic and restart all
the machines with kafka and zookeeper but it didn't fix it.

User class threw exception: org.apache.spark.SparkException: Job
aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most
recent failure: Lost task 4.3 in stage 1.0 (TID 12, x):
java.lang.AssertionError: assertion failed: Ran out of messages before
reaching ending offset 1226116 for topic kafka-global-paas partition 7
start 1212156. This should not happen, and indicates that messages may
have been lost
at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

I read some threads with this error but it didn't help me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Configuring log4j Spark

2016-03-30 Thread Guillermo Ortiz
I changed the place of --files and works.

 ( IT DOESN'T WORK)
spark-submit  --conf spark.metrics.conf=metrics.properties --name
"myProject" --master yarn-cluster --class myCompany.spark.MyClass *--files
/opt/myProject/conf/log4j.properties* --jars $SPARK_CLASSPATH
--executor-memory 1024m --num-executors 5  --executor-cores 1
--driver-memory 1024m  /opt/myProject/myJar.jar

(IT WORKS)
spark-submit  --conf spark.metrics.conf=metrics.properties --name
"myProject" --master yarn-cluster --class myCompany.spark.MyClass  --jars
$SPARK_CLASSPATH --executor-memory 1024m --num-executors 5
 --executor-cores 1 --driver-memory 1024m *--files
/opt/myProject/conf/log4j.properties*  /opt/myProject/myJar.jar

I think I didn't do any others changes.



2016-03-30 15:42 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:

> I'm trying to configure log4j in Spark.
>
> spark-submit  --conf spark.metrics.conf=metrics.properties --name
> "myProject" --master yarn-cluster --class myCompany.spark.MyClass *--files
> /opt/myProject/conf/log4j.properties* --jars $SPARK_CLASSPATH
> --executor-memory 1024m --num-executors 5  --executor-cores 1
> --driver-memory 1024m  /opt/myProject/myJar.jar
>
> I have this log4j.properties
> log4j.rootCategory=DEBUG, RollingAppender, myConsoleAppender
> #log4j.logger.mycompany.spark=DEBUG
> log4j.category.myCompany.spark=DEBUG
> spark.log.dir=/opt/myProject/log
> spark.log.file=spark.log
>
> log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
> log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.myConsoleAppender.Target=System.out
> log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c
> - %m%n
>
> log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
> log4j.appender.RollingAppender.MaxFileSize=50MB
> log4j.appender.RollingAppender.MaxBackupIndex=5
> log4j.appender.RollingAppender.layout.ConversionPattern=%d{dd MMM 
> HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
> log4j.appender.RollingAppender.File=${spark.log.dir}/${spark.log.file}
> log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M -
> %m%n
>
> With this I see the log driver with DEBUG level, but the executors with
> INFO level. Why can't I see the executor logs in INFO level?
> I'm using Spark 1.5.0
>
>
>


Configuring log4j Spark

2016-03-30 Thread Guillermo Ortiz
I'm trying to configure log4j in Spark.

spark-submit  --conf spark.metrics.conf=metrics.properties --name
"myProject" --master yarn-cluster --class myCompany.spark.MyClass *--files
/opt/myProject/conf/log4j.properties* --jars $SPARK_CLASSPATH
--executor-memory 1024m --num-executors 5  --executor-cores 1
--driver-memory 1024m  /opt/myProject/myJar.jar

I have this log4j.properties
log4j.rootCategory=DEBUG, RollingAppender, myConsoleAppender
#log4j.logger.mycompany.spark=DEBUG
log4j.category.myCompany.spark=DEBUG
spark.log.dir=/opt/myProject/log
spark.log.file=spark.log

log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsoleAppender.Target=System.out
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c -
%m%n

log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppender.MaxFileSize=50MB
log4j.appender.RollingAppender.MaxBackupIndex=5
log4j.appender.RollingAppender.layout.ConversionPattern=%d{dd MMM 
HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
log4j.appender.RollingAppender.File=${spark.log.dir}/${spark.log.file}
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n

With this I see the log driver with DEBUG level, but the executors with
INFO level. Why can't I see the executor logs in INFO level?
I'm using Spark 1.5.0


Checkpoints in Spark

2016-03-30 Thread Guillermo Ortiz
I'm curious about what kind of things are saved in the checkpoints.

I just changed the number of executors when I execute Spark and it didn't
happen until I remove the checkpoint, I guess that if I'm using
log4j.properties and I want to changed I have to remove the checkpoint as
well.

When you need to change your code and don't wan to to lose any data, Is
there any easy way to do this change in the code?


Problem with union of DirectStream

2016-03-10 Thread Guillermo Ortiz
I have a DirectStream and process data from Kafka,

 val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet)
 directKafkaStream.foreachRDD { rdd =>
  val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

When I have added a new DirectStream and do a union between both it doesn't
work. I thought that it was the same type, but I got a ClassCastException


 val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet)
val directKafkaStream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams2, topics2.toSet)
val kafkaStream = directKafkaStream.union(directKafkaStream2)

kafkaStream.foreachRDD { rdd =>
  val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-->Exception

Exception in thread "main" java.lang.ClassCastException:
org.apache.spark.rdd.UnionRDD cannot be cast to
org.apache.spark.streaming.kafka.HasOffsetRanges
at
com.produban.metrics.MetricsSpark$$anonfun$main$1.apply(MetricsSpark.scala:72)

I guessed that rdd.union(rdd2) gives same type of RDD..


Re: Get all vertexes with outDegree equals to 0 with GraphX

2016-02-27 Thread Guillermo Ortiz
Thank you, I have to think what the code does,, because I am a little noob
in scala and it's hard to understand it to me.

2016-02-27 3:53 GMT+01:00 Mohammed Guller <moham...@glassbeam.com>:

> Here is another solution (minGraph is the graph from your code. I assume
> that is your original graph):
>
>
>
> val graphWithNoOutEdges = minGraph.filter(
>
>   graph => graph.outerJoinVertices(graph.outDegrees) {(vId, vData,
> outDegreesOpt) => outDegreesOpt.getOrElse(0)},
>
>   vpred = (vId: VertexId, vOutDegrees: Int) => vOutDegrees == 0
>
> )
>
>
>
> val verticesWithNoOutEdges = graphWithNoOutEdges.vertices
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Guillermo Ortiz [mailto:konstt2...@gmail.com]
> *Sent:* Friday, February 26, 2016 5:46 AM
> *To:* Robin East
> *Cc:* user
> *Subject:* Re: Get all vertexes with outDegree equals to 0 with GraphX
>
>
>
> Yes, I am not really happy with that "collect".
>
> I was taking a look to use subgraph method and others options and didn't
> figure out anything easy or direct..
>
>
>
> I'm going to try your idea.
>
>
>
> 2016-02-26 14:16 GMT+01:00 Robin East <robin.e...@xense.co.uk>:
>
> Whilst I can think of other ways to do it I don’t think they would be
> conceptually or syntactically any simpler. GraphX doesn’t have the concept
> of built-in vertex properties which would make this simpler - a vertex in
> GraphX is a Vertex ID (Long) and a bunch of custom attributes that you
> assign. This means you have to find a way of ‘pushing’ the vertex degree
> into the graph so you can do comparisons (cf a join in relational
> databases) or as you have done create a list and filter against that (cf
> filtering against a sub-query in relational database).
>
>
>
> One thing I would point out is that you probably want to avoid
> finalVerexes.collect() for a large-scale system - this will pull all the
> vertices into the driver and then push them out to the executors again as
> part of the filter operation. A better strategy for large graphs would be:
>
>
>
> 1. build a graph based on the existing graph where the vertex attribute is
> the vertex degree - the GraphX documentation shows how to do this
>
> 2. filter this “degrees” graph to just give you 0 degree vertices
>
> 3 use graph.mask passing in the 0-degree graph to get the original graph
> with just 0 degree vertices
>
>
>
> Just one variation on several possibilities, the key point is that
> everything is just a graph transformation until you call an action on the
> resulting graph
>
>
> -------
>
> Robin East
>
> *Spark GraphX in Action *Michael Malak and Robin East
>
> Manning Publications Co.
>
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
>
>
>
>
> On 26 Feb 2016, at 11:59, Guillermo Ortiz <konstt2...@gmail.com> wrote:
>
>
>
> I'm new with graphX. I need to get the vertex without out edges..
>
> I guess that it's pretty easy but I did it pretty complicated.. and
> inefficienct
>
>
>
> *val *vertices: RDD[(VertexId, (List[String], List[String]))] =
>   sc.parallelize(*Array*((1L, (*List*(*"a"*), *List*[String]())),
> (2L, (*List*(*"b"*), *List*[String]())),
> (3L, (*List*(*"c"*), *List*[String]())),
> (4L, (*List*(*"d"*), *List*[String]())),
> (5L, (*List*(*"e"*), *List*[String]())),
> (6L, (*List*(*"f"*), *List*[String]()
>
>
> *// Create an RDD for edges**val *relationships: RDD[Edge[Boolean]] =
>   sc.parallelize(*Array*(*Edge*(1L, 2L, *true*), *Edge*(2L, 3L, *true*), 
> *Edge*(3L, 4L, *true*), *Edge*(5L, 2L, *true*)))
>
> *val *out = minGraph.*outDegrees*.map(vertex => vertex._1)
>
> *val *finalVertexes = minGraph.vertices.keys.subtract(out)
>
> //It must be something better than this way..
> *val *nodes = finalVertexes.collect()
> *val *result = minGraph.vertices.filter(v => nodes.contains(v._1))
>
>
>
> What's the good way to do this operation? It seems that it should be pretty 
> easy.
>
>
>
>
>


Re: Get all vertexes with outDegree equals to 0 with GraphX

2016-02-26 Thread Guillermo Ortiz
Yes, I am not really happy with that "collect".
I was taking a look to use subgraph method and others options and didn't
figure out anything easy or direct..

I'm going to try your idea.

2016-02-26 14:16 GMT+01:00 Robin East <robin.e...@xense.co.uk>:

> Whilst I can think of other ways to do it I don’t think they would be
> conceptually or syntactically any simpler. GraphX doesn’t have the concept
> of built-in vertex properties which would make this simpler - a vertex in
> GraphX is a Vertex ID (Long) and a bunch of custom attributes that you
> assign. This means you have to find a way of ‘pushing’ the vertex degree
> into the graph so you can do comparisons (cf a join in relational
> databases) or as you have done create a list and filter against that (cf
> filtering against a sub-query in relational database).
>
> One thing I would point out is that you probably want to avoid
> finalVerexes.collect() for a large-scale system - this will pull all the
> vertices into the driver and then push them out to the executors again as
> part of the filter operation. A better strategy for large graphs would be:
>
> 1. build a graph based on the existing graph where the vertex attribute is
> the vertex degree - the GraphX documentation shows how to do this
> 2. filter this “degrees” graph to just give you 0 degree vertices
> 3 use graph.mask passing in the 0-degree graph to get the original graph
> with just 0 degree vertices
>
> Just one variation on several possibilities, the key point is that
> everything is just a graph transformation until you call an action on the
> resulting graph
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 26 Feb 2016, at 11:59, Guillermo Ortiz <konstt2...@gmail.com> wrote:
>
> I'm new with graphX. I need to get the vertex without out edges..
> I guess that it's pretty easy but I did it pretty complicated.. and
> inefficienct
>
> val vertices: RDD[(VertexId, (List[String], List[String]))] =
>   sc.parallelize(Array((1L, (List("a"), List[String]())),
> (2L, (List("b"), List[String]())),
> (3L, (List("c"), List[String]())),
> (4L, (List("d"), List[String]())),
> (5L, (List("e"), List[String]())),
> (6L, (List("f"), List[String]()
>
> // Create an RDD for edges
> val relationships: RDD[Edge[Boolean]] =
>   sc.parallelize(Array(Edge(1L, 2L, true), Edge(2L, 3L, true), Edge(3L, 4L, 
> true), Edge(5L, 2L, true)))
>
> val out = minGraph.outDegrees.map(vertex => vertex._1)
>
> val finalVertexes = minGraph.vertices.keys.subtract(out)
>
> //It must be something better than this way..
> val nodes = finalVertexes.collect()
> val result = minGraph.vertices.filter(v => nodes.contains(v._1))
>
>
> What's the good way to do this operation? It seems that it should be pretty 
> easy.
>
>
>


Get all vertexes with outDegree equals to 0 with GraphX

2016-02-26 Thread Guillermo Ortiz
I'm new with graphX. I need to get the vertex without out edges..
I guess that it's pretty easy but I did it pretty complicated.. and
inefficienct

val vertices: RDD[(VertexId, (List[String], List[String]))] =
  sc.parallelize(Array((1L, (List("a"), List[String]())),
(2L, (List("b"), List[String]())),
(3L, (List("c"), List[String]())),
(4L, (List("d"), List[String]())),
(5L, (List("e"), List[String]())),
(6L, (List("f"), List[String]()

// Create an RDD for edges
val relationships: RDD[Edge[Boolean]] =
  sc.parallelize(Array(Edge(1L, 2L, true), Edge(2L, 3L, true),
Edge(3L, 4L, true), Edge(5L, 2L, true)))

val out = minGraph.outDegrees.map(vertex => vertex._1)

val finalVertexes = minGraph.vertices.keys.subtract(out)

//It must be something better than this way..
val nodes = finalVertexes.collect()
val result = minGraph.vertices.filter(v => nodes.contains(v._1))


What's the good way to do this operation? It seems that it should be
pretty easy.


Re: How could I do this algorithm in Spark?

2016-02-25 Thread Guillermo Ortiz
I'm going to try to do it with Pregel.. it there are others ideas...
great!.

What do you call P time? I think that it's O(Number Vertex * N)

2016-02-25 16:17 GMT+01:00 Darren Govoni <dar...@ontrenet.com>:

> This might be hard to do. One generalization of this problem is
> https://en.m.wikipedia.org/wiki/Longest_path_problem
>
> Given a node (e.g. A), find longest path. All interior relations are
> transitive and can be inferred.
>
> But finding a distributed spark way of doing it in P time would be
> interesting.
>
>
> Sent from my Verizon Wireless 4G LTE smartphone
>
>
> ---- Original message 
> From: Guillermo Ortiz <konstt2...@gmail.com>
> Date: 02/24/2016 5:26 PM (GMT-05:00)
> To: user <user@spark.apache.org>
> Subject: How could I do this algorithm in Spark?
>
> I want to do some algorithm in Spark.. I know how to do it in a single
> machine where all data are together, but I don't know a good way to do it
> in Spark.
>
> If someone has an idea..
> I have some data like this
> a , b
> x , y
> b , c
> y , y
> c , d
>
> I want something like:
> a , d
> b , d
> c , d
> x , y
> y , y
>
> I need to know that a->b->c->d, so a->d, b->d and c->d.
> I don't want the code, just an idea how I could deal with it.
>
> Any idea?
>


Re: How could I do this algorithm in Spark?

2016-02-25 Thread Guillermo Ortiz
Thank you!, I'm trying to do it with Pregel,, it's being hard because I
have never used GraphX and Pregel before.

2016-02-25 14:00 GMT+01:00 Sabarish Sasidharan <sabarish@gmail.com>:

> Like Robin said, pls explore Pregel. You could do it without Pregel but it
> might be laborious. I have a simple outline below. You will need more
> iterations if the number of levels is higher.
>
> a-b
> b-c
> c-d
> b-e
> e-f
> f-c
>
> flatmaptopair
>
> a -> (a-b)
> b -> (a-b)
> b -> (b-c)
> c -> (b-c)
> c -> (c-d)
> d -> (c-d)
> b -> (b-e)
> e -> (b-e)
> e -> (e-f)
> f -> (e-f)
> f -> (f-c)
> c -> (f-c)
>
> aggregatebykey
>
> a -> (a-b)
> b -> (a-b, b-c, b-e)
> c -> (b-c, c-d, f-c)
> d -> (c-d)
> e -> (b-e, e-f)
> f -> (e-f, f-c)
>
> filter to remove keys with less than 2 values
>
> b -> (a-b, b-c, b-e)
> c -> (b-c, c-d, f-c)
> e -> (b-e, e-f)
> f -> (e-f, f-c)
>
> flatmap
>
> a-b-c
> a-b-e
> b-c-d
> b-e-f
> e-f-c
>
> flatmaptopair followed by aggregatebykey
>
> (a-b) -> (a-b-c, a-b-e)
> (b-c) -> (a-b-c, b-c-d)
> (c-d) -> (b-c-d)
> (b-e) -> (b-e-f)
> (e-f) -> (b-e-f, e-f-c)
> (f-c) -> (e-f-c)
>
> filter out keys with less than 2 values
>
> (b-c) -> (a-b-c, b-c-d)
> (e-f) -> (b-e-f, e-f-c)
>
> mapvalues
>
> a-b-c-d
> b-e-f-c
>
> flatmap
>
> a,d
> b,d
> c,d
> b,c
> e,c
> f,c
>
>
> On Thu, Feb 25, 2016 at 6:19 PM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> I'm taking a look to Pregel. It seems it's a good way to do it. The only
>> negative thing that I see it's not a really complex graph with a lot of
>> edges between the vertex .. They are more like a lot of isolated small
>> graphs
>>
>> 2016-02-25 12:32 GMT+01:00 Robin East <robin.e...@xense.co.uk>:
>>
>>> The structures you are describing look like edges of a graph and you
>>> want to follow the graph to a terminal vertex and then propagate that value
>>> back up the path. On this assumption it would be simple to create the
>>> structures as graphs in GraphX and use Pregel for the algorithm
>>> implementation.
>>>
>>> ---
>>> Robin East
>>> *Spark GraphX in Action* Michael Malak and Robin East
>>> Manning Publications Co.
>>> http://www.manning.com/books/spark-graphx-in-action
>>>
>>>
>>>
>>>
>>>
>>> On 25 Feb 2016, at 10:52, Guillermo Ortiz <konstt2...@gmail.com> wrote:
>>>
>>> Oh, the letters were just an example, it could be:
>>> a , t
>>> b, o
>>> t, k
>>> k, c
>>>
>>> So.. a -> t -> k -> c and the result is: a,c; t,c; k,c and b,o
>>> I don't know if you were thinking about sortBy because the another
>>> example where letter were consecutive.
>>>
>>>
>>> 2016-02-25 9:42 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>:
>>>
>>>> I don't see that sorting the data helps.
>>>> The answer has to be all the associations. In this case the answer has
>>>> to be:
>>>> a , b --> it was a error in the question, sorry.
>>>> b , d
>>>> c , d
>>>> x , y
>>>> y , y
>>>>
>>>> I feel like all the data which is associate should be in the same
>>>> executor.
>>>> On this case if I order the inputs.
>>>> a , b
>>>> x , y
>>>> b , c
>>>> y , y
>>>> c , d
>>>> --> to
>>>> a , b
>>>> b , c
>>>> c , d
>>>> x , y
>>>> y , y
>>>>
>>>> Now, a,b ; b,c; one partitions for example, "c,d" and "x,y" another one
>>>> and so on.
>>>> I could get the relation between "a,b,c", but not about "d" with
>>>> "a,b,c", am I wrong? I hope to be wrong!.
>>>>
>>>> It seems that it could be done with GraphX, but as you said, it seems a
>>>> little bit overhead.
>>>>
>>>>
>>>> 2016-02-25 5:43 GMT+01:00 James Barney <jamesbarne...@gmail.com>:
>>>>
>>>>> Guillermo,
>>>>> I think you're after an associative algorithm where A is ultimately
>>>>> associated with D, correct? Jakob would correct if that is a typo--a sort
>

Re: Number partitions after a join

2016-02-25 Thread Guillermo Ortiz
Good to know, thanks everybody!

2016-02-25 15:29 GMT+01:00 JOAQUIN GUANTER GONZALBEZ <
joaquin.guantergonzal...@telefonica.com>:

> Actually that only applies to Spark SQL. I believe that in plain RDD, the
> resulting join will have as many partitions as the RDD with the most
> partition.
>
>
>
> Cheers,
>
> Ximo
>
>
>
> *De:* Guillermo Ortiz [mailto:konstt2...@gmail.com]
> *Enviado el:* jueves, 25 de febrero de 2016 15:19
> *Para:* Takeshi Yamamuro <linguin@gmail.com>
> *CC:* user <user@spark.apache.org>
> *Asunto:* Re: Number partitions after a join
>
>
>
> thank you, I didn't see that option.
>
>
>
> 2016-02-25 14:51 GMT+01:00 Takeshi Yamamuro <linguin@gmail.com>:
>
> Hi,
>
>
>
> The number depends on `spark.sql.shuffle.partitions`.
>
> See:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
>
>
>
> On Thu, Feb 25, 2016 at 7:42 PM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
> When you do a join in Spark, how many partitions are as result? is it a
> default number if you don't specify the number of partitions?
>
>
>
>
>
> --
>
> ---
> Takeshi Yamamuro
>
>
>
> --
>
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario,
> puede contener información privilegiada o confidencial y es para uso
> exclusivo de la persona o entidad de destino. Si no es usted. el
> destinatario indicado, queda notificado de que la lectura, utilización,
> divulgación y/o copia sin autorización puede estar prohibida en virtud de
> la legislación vigente. Si ha recibido este mensaje por error, le rogamos
> que nos lo comunique inmediatamente por esta misma vía y proceda a su
> destrucción.
>
> The information contained in this transmission is privileged and
> confidential information intended only for the use of the individual or
> entity named above. If the reader of this message is not the intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of this communication is strictly prohibited. If you have received
> this transmission in error, do not read it. Please immediately reply to the
> sender that you have received this communication in error and then delete
> it.
>
> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário,
> pode conter informação privilegiada ou confidencial e é para uso exclusivo
> da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário
> indicado, fica notificado de que a leitura, utilização, divulgação e/ou
> cópia sem autorização pode estar proibida em virtude da legislação vigente.
> Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique
> imediatamente por esta mesma via e proceda a sua destruição
>


Re: Number partitions after a join

2016-02-25 Thread Guillermo Ortiz
thank you, I didn't see that option.

2016-02-25 14:51 GMT+01:00 Takeshi Yamamuro <linguin@gmail.com>:

> Hi,
>
> The number depends on `spark.sql.shuffle.partitions`.
> See:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
>
> On Thu, Feb 25, 2016 at 7:42 PM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> When you do a join in Spark, how many partitions are as result? is it a
>> default number if you don't specify the number of partitions?
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: How could I do this algorithm in Spark?

2016-02-25 Thread Guillermo Ortiz
I'm taking a look to Pregel. It seems it's a good way to do it. The only
negative thing that I see it's not a really complex graph with a lot of
edges between the vertex .. They are more like a lot of isolated small
graphs

2016-02-25 12:32 GMT+01:00 Robin East <robin.e...@xense.co.uk>:

> The structures you are describing look like edges of a graph and you want
> to follow the graph to a terminal vertex and then propagate that value back
> up the path. On this assumption it would be simple to create the structures
> as graphs in GraphX and use Pregel for the algorithm implementation.
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 25 Feb 2016, at 10:52, Guillermo Ortiz <konstt2...@gmail.com> wrote:
>
> Oh, the letters were just an example, it could be:
> a , t
> b, o
> t, k
> k, c
>
> So.. a -> t -> k -> c and the result is: a,c; t,c; k,c and b,o
> I don't know if you were thinking about sortBy because the another example
> where letter were consecutive.
>
>
> 2016-02-25 9:42 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>:
>
>> I don't see that sorting the data helps.
>> The answer has to be all the associations. In this case the answer has to
>> be:
>> a , b --> it was a error in the question, sorry.
>> b , d
>> c , d
>> x , y
>> y , y
>>
>> I feel like all the data which is associate should be in the same
>> executor.
>> On this case if I order the inputs.
>> a , b
>> x , y
>> b , c
>> y , y
>> c , d
>> --> to
>> a , b
>> b , c
>> c , d
>> x , y
>> y , y
>>
>> Now, a,b ; b,c; one partitions for example, "c,d" and "x,y" another one
>> and so on.
>> I could get the relation between "a,b,c", but not about "d" with "a,b,c",
>> am I wrong? I hope to be wrong!.
>>
>> It seems that it could be done with GraphX, but as you said, it seems a
>> little bit overhead.
>>
>>
>> 2016-02-25 5:43 GMT+01:00 James Barney <jamesbarne...@gmail.com>:
>>
>>> Guillermo,
>>> I think you're after an associative algorithm where A is ultimately
>>> associated with D, correct? Jakob would correct if that is a typo--a sort
>>> would be all that is necessary in that case.
>>>
>>> I believe you're looking for something else though, if I understand
>>> correctly.
>>>
>>> This seems like a similar algorithm to PageRank, no?
>>> https://github.com/amplab/graphx/blob/master/python/examples/pagerank.py
>>> Except return the "neighbor" itself, not the necessarily the rank of the
>>> page.
>>>
>>> If you wanted to, use Scala and Graphx for this problem. Might be a bit
>>> of overhead though: Construct a node for each member of each tuple with an
>>> edge between. Then traverse the graph for all sets of nodes that are
>>> connected. That result set would quickly explode in size, but you could
>>> restrict results to a minimum N connections. I'm not super familiar with
>>> Graphx myself, however. My intuition is saying 'graph problem' though.
>>>
>>> Thoughts?
>>>
>>>
>>> On Wed, Feb 24, 2016 at 6:43 PM, Jakob Odersky <ja...@odersky.com>
>>> wrote:
>>>
>>>> Hi Guillermo,
>>>> assuming that the first "a,b" is a typo and you actually meant "a,d",
>>>> this is a sorting problem.
>>>>
>>>> You could easily model your data as an RDD or tuples (or as a
>>>> dataframe/set) and use the sortBy (or orderBy for dataframe/sets)
>>>> methods.
>>>>
>>>> best,
>>>> --Jakob
>>>>
>>>> On Wed, Feb 24, 2016 at 2:26 PM, Guillermo Ortiz <konstt2...@gmail.com>
>>>> wrote:
>>>> > I want to do some algorithm in Spark.. I know how to do it in a single
>>>> > machine where all data are together, but I don't know a good way to
>>>> do it in
>>>> > Spark.
>>>> >
>>>> > If someone has an idea..
>>>> > I have some data like this
>>>> > a , b
>>>> > x , y
>>>> > b , c
>>>> > y , y
>>>> > c , d
>>>> >
>>>> > I want something like:
>>>> > a , d
>>>> > b , d
>>>> > c , d
>>>> > x , y
>>>> > y , y
>>>> >
>>>> > I need to know that a->b->c->d, so a->d, b->d and c->d.
>>>> > I don't want the code, just an idea how I could deal with it.
>>>> >
>>>> > Any idea?
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>
>


Re: How could I do this algorithm in Spark?

2016-02-25 Thread Guillermo Ortiz
Oh, the letters were just an example, it could be:
a , t
b, o
t, k
k, c

So.. a -> t -> k -> c and the result is: a,c; t,c; k,c and b,o
I don't know if you were thinking about sortBy because the another example
where letter were consecutive.


2016-02-25 9:42 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>:

> I don't see that sorting the data helps.
> The answer has to be all the associations. In this case the answer has to
> be:
> a , b --> it was a error in the question, sorry.
> b , d
> c , d
> x , y
> y , y
>
> I feel like all the data which is associate should be in the same executor.
> On this case if I order the inputs.
> a , b
> x , y
> b , c
> y , y
> c , d
> --> to
> a , b
> b , c
> c , d
> x , y
> y , y
>
> Now, a,b ; b,c; one partitions for example, "c,d" and "x,y" another one
> and so on.
> I could get the relation between "a,b,c", but not about "d" with "a,b,c",
> am I wrong? I hope to be wrong!.
>
> It seems that it could be done with GraphX, but as you said, it seems a
> little bit overhead.
>
>
> 2016-02-25 5:43 GMT+01:00 James Barney <jamesbarne...@gmail.com>:
>
>> Guillermo,
>> I think you're after an associative algorithm where A is ultimately
>> associated with D, correct? Jakob would correct if that is a typo--a sort
>> would be all that is necessary in that case.
>>
>> I believe you're looking for something else though, if I understand
>> correctly.
>>
>> This seems like a similar algorithm to PageRank, no?
>> https://github.com/amplab/graphx/blob/master/python/examples/pagerank.py
>> Except return the "neighbor" itself, not the necessarily the rank of the
>> page.
>>
>> If you wanted to, use Scala and Graphx for this problem. Might be a bit
>> of overhead though: Construct a node for each member of each tuple with an
>> edge between. Then traverse the graph for all sets of nodes that are
>> connected. That result set would quickly explode in size, but you could
>> restrict results to a minimum N connections. I'm not super familiar with
>> Graphx myself, however. My intuition is saying 'graph problem' though.
>>
>> Thoughts?
>>
>>
>> On Wed, Feb 24, 2016 at 6:43 PM, Jakob Odersky <ja...@odersky.com> wrote:
>>
>>> Hi Guillermo,
>>> assuming that the first "a,b" is a typo and you actually meant "a,d",
>>> this is a sorting problem.
>>>
>>> You could easily model your data as an RDD or tuples (or as a
>>> dataframe/set) and use the sortBy (or orderBy for dataframe/sets)
>>> methods.
>>>
>>> best,
>>> --Jakob
>>>
>>> On Wed, Feb 24, 2016 at 2:26 PM, Guillermo Ortiz <konstt2...@gmail.com>
>>> wrote:
>>> > I want to do some algorithm in Spark.. I know how to do it in a single
>>> > machine where all data are together, but I don't know a good way to do
>>> it in
>>> > Spark.
>>> >
>>> > If someone has an idea..
>>> > I have some data like this
>>> > a , b
>>> > x , y
>>> > b , c
>>> > y , y
>>> > c , d
>>> >
>>> > I want something like:
>>> > a , d
>>> > b , d
>>> > c , d
>>> > x , y
>>> > y , y
>>> >
>>> > I need to know that a->b->c->d, so a->d, b->d and c->d.
>>> > I don't want the code, just an idea how I could deal with it.
>>> >
>>> > Any idea?
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Number partitions after a join

2016-02-25 Thread Guillermo Ortiz
When you do a join in Spark, how many partitions are as result? is it a
default number if you don't specify the number of partitions?


Re: How could I do this algorithm in Spark?

2016-02-25 Thread Guillermo Ortiz
I don't see that sorting the data helps.
The answer has to be all the associations. In this case the answer has to
be:
a , b --> it was a error in the question, sorry.
b , d
c , d
x , y
y , y

I feel like all the data which is associate should be in the same executor.
On this case if I order the inputs.
a , b
x , y
b , c
y , y
c , d
--> to
a , b
b , c
c , d
x , y
y , y

Now, a,b ; b,c; one partitions for example, "c,d" and "x,y" another one and
so on.
I could get the relation between "a,b,c", but not about "d" with "a,b,c",
am I wrong? I hope to be wrong!.

It seems that it could be done with GraphX, but as you said, it seems a
little bit overhead.


2016-02-25 5:43 GMT+01:00 James Barney <jamesbarne...@gmail.com>:

> Guillermo,
> I think you're after an associative algorithm where A is ultimately
> associated with D, correct? Jakob would correct if that is a typo--a sort
> would be all that is necessary in that case.
>
> I believe you're looking for something else though, if I understand
> correctly.
>
> This seems like a similar algorithm to PageRank, no?
> https://github.com/amplab/graphx/blob/master/python/examples/pagerank.py
> Except return the "neighbor" itself, not the necessarily the rank of the
> page.
>
> If you wanted to, use Scala and Graphx for this problem. Might be a bit of
> overhead though: Construct a node for each member of each tuple with an
> edge between. Then traverse the graph for all sets of nodes that are
> connected. That result set would quickly explode in size, but you could
> restrict results to a minimum N connections. I'm not super familiar with
> Graphx myself, however. My intuition is saying 'graph problem' though.
>
> Thoughts?
>
>
> On Wed, Feb 24, 2016 at 6:43 PM, Jakob Odersky <ja...@odersky.com> wrote:
>
>> Hi Guillermo,
>> assuming that the first "a,b" is a typo and you actually meant "a,d",
>> this is a sorting problem.
>>
>> You could easily model your data as an RDD or tuples (or as a
>> dataframe/set) and use the sortBy (or orderBy for dataframe/sets)
>> methods.
>>
>> best,
>> --Jakob
>>
>> On Wed, Feb 24, 2016 at 2:26 PM, Guillermo Ortiz <konstt2...@gmail.com>
>> wrote:
>> > I want to do some algorithm in Spark.. I know how to do it in a single
>> > machine where all data are together, but I don't know a good way to do
>> it in
>> > Spark.
>> >
>> > If someone has an idea..
>> > I have some data like this
>> > a , b
>> > x , y
>> > b , c
>> > y , y
>> > c , d
>> >
>> > I want something like:
>> > a , d
>> > b , d
>> > c , d
>> > x , y
>> > y , y
>> >
>> > I need to know that a->b->c->d, so a->d, b->d and c->d.
>> > I don't want the code, just an idea how I could deal with it.
>> >
>> > Any idea?
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


How could I do this algorithm in Spark?

2016-02-24 Thread Guillermo Ortiz
I want to do some algorithm in Spark.. I know how to do it in a single
machine where all data are together, but I don't know a good way to do it
in Spark.

If someone has an idea..
I have some data like this
a , b
x , y
b , c
y , y
c , d

I want something like:
a , d
b , d
c , d
x , y
y , y

I need to know that a->b->c->d, so a->d, b->d and c->d.
I don't want the code, just an idea how I could deal with it.

Any idea?


Number of executors in Spark - Kafka

2016-01-21 Thread Guillermo Ortiz
I'm using Spark Streaming and Kafka with Direct Approach. I have created a
topic with 6 partitions so when I execute Spark there are six RDD. I
understand than ideally it should have six executors to process each one
one RDD. To do it, when I execute spark-submit (I use  YARN) I specific the
number executors to six.
If I don't specific anything it just create one executor. Looking for
information I have read:

"The --num-executors command-line flag or spark.executor.instances
configuration
property control the number of executors requested. Starting in CDH
5.4/Spark 1.3, you will be able to avoid setting this property by turning
on dynamic allocation

with
thespark.dynamicAllocation.enabled property. Dynamic allocation enables a
Spark application to request executors when there is a backlog of pending
tasks and free up executors when idle."

I have this parameter enabled, I understand than if I don't set the
parameter --num-executors it must create six executors or am I wrong?


Re: Spark job stops after a while.

2016-01-21 Thread Guillermo Ortiz
I'm using 1.5.0 of  Spark confirmed. Less this
jar file:/opt/centralLogs/lib/spark-catalyst_2.10-1.5.1.jar.

I'm going to keep looking for,, Thank you!.

2016-01-21 16:29 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:

> Maybe this is related (fixed in 1.5.3):
> SPARK-11195 Exception thrown on executor throws ClassNotFoundException on
> driver
>
> FYI
>
> On Thu, Jan 21, 2016 at 7:10 AM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> I'm using CDH 5.5.1 with Spark 1.5.x (I think that it's 1.5.2).
>>
>> I know that the library is here:
>> cloud-user@ose10kafkaelk:/opt/centralLogs/lib$ jar tf
>> elasticsearch-hadoop-2.2.0-beta1.jar | grep
>>  EsHadoopIllegalArgumentException
>> org/elasticsearch/hadoop/EsHadoopIllegalArgumentException.class
>>
>> I have check in SparkUI with the process running
>> http://10.129.96.55:39320/jars/elasticsearch-hadoop-2.2.0-beta1.jar Added
>> By User
>> And spark.jars from SparkUI.
>>
>> .file:/opt/centralLogs/lib/elasticsearch-hadoop-2.2.0-beta1.jar,file:/opt/centralLogs/lib/geronimo-annotation_1.0_spec-1.1.1.jar,
>>
>> I think that in yarn-client although it has the error it doesn't stop the
>> execution, but I don't know why.
>>
>>
>>
>> 2016-01-21 15:55 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>>
>>> Looks like jar containing EsHadoopIllegalArgumentException class wasn't
>>> in the classpath.
>>> Can you double check ?
>>>
>>> Which Spark version are you using ?
>>>
>>> Cheers
>>>
>>> On Thu, Jan 21, 2016 at 6:50 AM, Guillermo Ortiz <konstt2...@gmail.com>
>>> wrote:
>>>
>>>> I'm runing a Spark Streaming process and it stops in a while. It makes
>>>> some process an insert the result in ElasticSeach with its library. After a
>>>> while the process fail.
>>>>
>>>> I have been checking the logs and I have seen this error
>>>> 2016-01-21 14:57:54,388 [sparkDriver-akka.actor.default-dispatcher-17]
>>>> INFO  org.apache.spark.storage.BlockManagerInfo - Added broadcast_2_piece0
>>>> in memory on ose11kafkaelk.novalocal:46913 (size: 6.0 KB, free: 530.3 MB)
>>>> 2016-01-21 14:57:54,646 [task-result-getter-1] INFO
>>>>  org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 2.0
>>>> (TID 7) in 397 ms on ose12kafkaelk.novalocal (1/6)
>>>> 2016-01-21 14:57:54,647 [task-result-getter-2] INFO
>>>>  org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 2.0
>>>> (TID 10) in 395 ms on ose12kafkaelk.novalocal (2/6)
>>>> 2016-01-21 14:57:54,731 [task-result-getter-3] INFO
>>>>  org.apache.spark.scheduler.TaskSetManager - Finished task 5.0 in stage 2.0
>>>> (TID 9) in 481 ms on ose11kafkaelk.novalocal (3/6)
>>>> 2016-01-21 14:57:54,844 [task-result-getter-1] INFO
>>>>  org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 2.0
>>>> (TID 8) in 595 ms on ose10kafkaelk.novalocal (4/6)
>>>> 2016-01-21 14:57:54,850 [task-result-getter-0] WARN
>>>>  org.apache.spark.ThrowableSerializationWrapper - Task exception could not
>>>> be deserialized
>>>> java.lang.ClassNotFoundException:
>>>> org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>
>>>> I don't know why I'm getting this error because the class
>>>> org.elasticsearch.hadoop.EsHadoopIllegalArgumentException is in the library
>>>> of elasticSearch.
>>>>
>>>> After this error I get others error and finally Spark ends.
>>>> 2016-01-21 14:57:55,012 [JobScheduler] INFO
>>>>  org.apache.spark.streaming.scheduler.JobScheduler - Starting job streaming
>>>> job 145338464 ms.0 from job set of time 145338464 ms
>>>> 2016-01-21 14:57:55,012 [JobScheduler] ERROR
>>>> org.apache.spark.streaming.scheduler.JobScheduler - Error running job
>>>> streaming job 1453384635000 ms.0
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 1 in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage
>>>> 2.0 (TID 13, ose11kafkaelk.novalocal): UnknownReason
>>>> Driver stacktrace:
>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>

Spark job stops after a while.

2016-01-21 Thread Guillermo Ortiz
I'm runing a Spark Streaming process and it stops in a while. It makes some
process an insert the result in ElasticSeach with its library. After a
while the process fail.

I have been checking the logs and I have seen this error
2016-01-21 14:57:54,388 [sparkDriver-akka.actor.default-dispatcher-17] INFO
 org.apache.spark.storage.BlockManagerInfo - Added broadcast_2_piece0 in
memory on ose11kafkaelk.novalocal:46913 (size: 6.0 KB, free: 530.3 MB)
2016-01-21 14:57:54,646 [task-result-getter-1] INFO
 org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 2.0
(TID 7) in 397 ms on ose12kafkaelk.novalocal (1/6)
2016-01-21 14:57:54,647 [task-result-getter-2] INFO
 org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 2.0
(TID 10) in 395 ms on ose12kafkaelk.novalocal (2/6)
2016-01-21 14:57:54,731 [task-result-getter-3] INFO
 org.apache.spark.scheduler.TaskSetManager - Finished task 5.0 in stage 2.0
(TID 9) in 481 ms on ose11kafkaelk.novalocal (3/6)
2016-01-21 14:57:54,844 [task-result-getter-1] INFO
 org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 2.0
(TID 8) in 595 ms on ose10kafkaelk.novalocal (4/6)
2016-01-21 14:57:54,850 [task-result-getter-0] WARN
 org.apache.spark.ThrowableSerializationWrapper - Task exception could not
be deserialized
java.lang.ClassNotFoundException:
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)

I don't know why I'm getting this error because the class
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException is in the library
of elasticSearch.

After this error I get others error and finally Spark ends.
2016-01-21 14:57:55,012 [JobScheduler] INFO
 org.apache.spark.streaming.scheduler.JobScheduler - Starting job streaming
job 145338464 ms.0 from job set of time 145338464 ms
2016-01-21 14:57:55,012 [JobScheduler] ERROR
org.apache.spark.streaming.scheduler.JobScheduler - Error running job
streaming job 1453384635000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage
2.0 (TID 13, ose11kafkaelk.novalocal): UnknownReason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1282)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1281)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1281)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1507)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1469)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:54)
at org.elasticsearch.spark.rdd.EsSpark$.saveJsonToEs(EsSpark.scala:90)
at
org.elasticsearch.spark.package$SparkJsonRDDFunctions.saveJsonToEs(package.scala:44)
at
produban.spark.CentralLog$$anonfun$createContext$1.apply(CentralLog.scala:56)
at
produban.spark.CentralLog$$anonfun$createContext$1.apply(CentralLog.scala:33)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at

Re: Spark job stops after a while.

2016-01-21 Thread Guillermo Ortiz
I'm using CDH 5.5.1 with Spark 1.5.x (I think that it's 1.5.2).

I know that the library is here:
cloud-user@ose10kafkaelk:/opt/centralLogs/lib$ jar tf
elasticsearch-hadoop-2.2.0-beta1.jar | grep
 EsHadoopIllegalArgumentException
org/elasticsearch/hadoop/EsHadoopIllegalArgumentException.class

I have check in SparkUI with the process running
http://10.129.96.55:39320/jars/elasticsearch-hadoop-2.2.0-beta1.jar Added
By User
And spark.jars from SparkUI.
.file:/opt/centralLogs/lib/elasticsearch-hadoop-2.2.0-beta1.jar,file:/opt/centralLogs/lib/geronimo-annotation_1.0_spec-1.1.1.jar,

I think that in yarn-client although it has the error it doesn't stop the
execution, but I don't know why.



2016-01-21 15:55 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:

> Looks like jar containing EsHadoopIllegalArgumentException class wasn't
> in the classpath.
> Can you double check ?
>
> Which Spark version are you using ?
>
> Cheers
>
> On Thu, Jan 21, 2016 at 6:50 AM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> I'm runing a Spark Streaming process and it stops in a while. It makes
>> some process an insert the result in ElasticSeach with its library. After a
>> while the process fail.
>>
>> I have been checking the logs and I have seen this error
>> 2016-01-21 14:57:54,388 [sparkDriver-akka.actor.default-dispatcher-17]
>> INFO  org.apache.spark.storage.BlockManagerInfo - Added broadcast_2_piece0
>> in memory on ose11kafkaelk.novalocal:46913 (size: 6.0 KB, free: 530.3 MB)
>> 2016-01-21 14:57:54,646 [task-result-getter-1] INFO
>>  org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 2.0
>> (TID 7) in 397 ms on ose12kafkaelk.novalocal (1/6)
>> 2016-01-21 14:57:54,647 [task-result-getter-2] INFO
>>  org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 2.0
>> (TID 10) in 395 ms on ose12kafkaelk.novalocal (2/6)
>> 2016-01-21 14:57:54,731 [task-result-getter-3] INFO
>>  org.apache.spark.scheduler.TaskSetManager - Finished task 5.0 in stage 2.0
>> (TID 9) in 481 ms on ose11kafkaelk.novalocal (3/6)
>> 2016-01-21 14:57:54,844 [task-result-getter-1] INFO
>>  org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 2.0
>> (TID 8) in 595 ms on ose10kafkaelk.novalocal (4/6)
>> 2016-01-21 14:57:54,850 [task-result-getter-0] WARN
>>  org.apache.spark.ThrowableSerializationWrapper - Task exception could not
>> be deserialized
>> java.lang.ClassNotFoundException:
>> org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> I don't know why I'm getting this error because the class
>> org.elasticsearch.hadoop.EsHadoopIllegalArgumentException is in the library
>> of elasticSearch.
>>
>> After this error I get others error and finally Spark ends.
>> 2016-01-21 14:57:55,012 [JobScheduler] INFO
>>  org.apache.spark.streaming.scheduler.JobScheduler - Starting job streaming
>> job 145338464 ms.0 from job set of time 145338464 ms
>> 2016-01-21 14:57:55,012 [JobScheduler] ERROR
>> org.apache.spark.streaming.scheduler.JobScheduler - Error running job
>> streaming job 1453384635000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
>> in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage
>> 2.0 (TID 13, ose11kafkaelk.novalocal): UnknownReason
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1282)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1281)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1281)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGSchedu

Re: Spark job stops after a while.

2016-01-21 Thread Guillermo Ortiz
I think that it's that bug, because the error is the same.. thanks a lot.

2016-01-21 16:46 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>:

> I'm using 1.5.0 of  Spark confirmed. Less this
> jar file:/opt/centralLogs/lib/spark-catalyst_2.10-1.5.1.jar.
>
> I'm going to keep looking for,, Thank you!.
>
> 2016-01-21 16:29 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>
>> Maybe this is related (fixed in 1.5.3):
>> SPARK-11195 Exception thrown on executor throws ClassNotFoundException on
>> driver
>>
>> FYI
>>
>> On Thu, Jan 21, 2016 at 7:10 AM, Guillermo Ortiz <konstt2...@gmail.com>
>> wrote:
>>
>>> I'm using CDH 5.5.1 with Spark 1.5.x (I think that it's 1.5.2).
>>>
>>> I know that the library is here:
>>> cloud-user@ose10kafkaelk:/opt/centralLogs/lib$ jar tf
>>> elasticsearch-hadoop-2.2.0-beta1.jar | grep
>>>  EsHadoopIllegalArgumentException
>>> org/elasticsearch/hadoop/EsHadoopIllegalArgumentException.class
>>>
>>> I have check in SparkUI with the process running
>>> http://10.129.96.55:39320/jars/elasticsearch-hadoop-2.2.0-beta1.jar Added
>>> By User
>>> And spark.jars from SparkUI.
>>>
>>> .file:/opt/centralLogs/lib/elasticsearch-hadoop-2.2.0-beta1.jar,file:/opt/centralLogs/lib/geronimo-annotation_1.0_spec-1.1.1.jar,
>>>
>>> I think that in yarn-client although it has the error it doesn't stop
>>> the execution, but I don't know why.
>>>
>>>
>>>
>>> 2016-01-21 15:55 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>>>
>>>> Looks like jar containing EsHadoopIllegalArgumentException class
>>>> wasn't in the classpath.
>>>> Can you double check ?
>>>>
>>>> Which Spark version are you using ?
>>>>
>>>> Cheers
>>>>
>>>> On Thu, Jan 21, 2016 at 6:50 AM, Guillermo Ortiz <konstt2...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm runing a Spark Streaming process and it stops in a while. It makes
>>>>> some process an insert the result in ElasticSeach with its library. After 
>>>>> a
>>>>> while the process fail.
>>>>>
>>>>> I have been checking the logs and I have seen this error
>>>>> 2016-01-21 14:57:54,388 [sparkDriver-akka.actor.default-dispatcher-17]
>>>>> INFO  org.apache.spark.storage.BlockManagerInfo - Added broadcast_2_piece0
>>>>> in memory on ose11kafkaelk.novalocal:46913 (size: 6.0 KB, free: 530.3 MB)
>>>>> 2016-01-21 14:57:54,646 [task-result-getter-1] INFO
>>>>>  org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 
>>>>> 2.0
>>>>> (TID 7) in 397 ms on ose12kafkaelk.novalocal (1/6)
>>>>> 2016-01-21 14:57:54,647 [task-result-getter-2] INFO
>>>>>  org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 
>>>>> 2.0
>>>>> (TID 10) in 395 ms on ose12kafkaelk.novalocal (2/6)
>>>>> 2016-01-21 14:57:54,731 [task-result-getter-3] INFO
>>>>>  org.apache.spark.scheduler.TaskSetManager - Finished task 5.0 in stage 
>>>>> 2.0
>>>>> (TID 9) in 481 ms on ose11kafkaelk.novalocal (3/6)
>>>>> 2016-01-21 14:57:54,844 [task-result-getter-1] INFO
>>>>>  org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 
>>>>> 2.0
>>>>> (TID 8) in 595 ms on ose10kafkaelk.novalocal (4/6)
>>>>> 2016-01-21 14:57:54,850 [task-result-getter-0] WARN
>>>>>  org.apache.spark.ThrowableSerializationWrapper - Task exception could not
>>>>> be deserialized
>>>>> java.lang.ClassNotFoundException:
>>>>> org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>>
>>>>> I don't know why I'm getting this error because the class
>>>>> org.elasticsearch.hadoop.EsHadoopIllegalArgumentException is in the 
>>>>> library
>>>>> of elasticSearch.
>>>>>
>>>>> After this error I get others error and finally Spark ends.
>>>>> 2016-01-21 14:57:55,012 [JobScheduler] INFO
>>>>>  org.apache.spark.streaming.scheduler.JobScheduler - Starting job 
>>>>> streami

Trying to index document in Solr with Spark and solr-spark library

2015-12-16 Thread Guillermo Ortiz
I'm trying to index document to Solr from Spark with the library solr-spark

I have create a project with Maven and include all the dependencies when I
execute spark but I get a ClassNotFoundException. I have check that the
class is in one of the jar that I'm including ( solr-solrj-4.10.3.jar)
I compiled the brach 4.X of this library because the last one it's to Solr
5.X

Th error  seems pretty clear, but I have checked in the sparkUI spark.jars
and the jars appear.

2015-12-16 16:13:40,265 [task-result-getter-3] WARN
 org.apache.spark.ThrowableSerializationWrapper - Task exception could not
be deserialized
java.lang.*ClassNotFoundException:
org.apache.solr.common.cloud.ZooKeeperException*
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: Trying to index document in Solr with Spark and solr-spark library

2015-12-16 Thread Guillermo Ortiz
I have found a error more specific in an executor. The another error is
happening in the Driver.

I have navigate in Zookeeper in the collection is created. Anyway, it seems
more a problem with Solr than Spark right now.

2015-12-16 16:31:43,923 [Executor task launch worker-1] INFO
org.apache.zookeeper.ZooKeeper - Session: 0x1519126c7d55b23 closed
2015-12-16 16:31:43,924 [Executor task launch worker-1] ERROR
org.apache.spark.executor.Executor - Exception in task 5.2 in stage
12.0 (TID 218)
org.apache.solr.common.cloud.ZooKeeperException:
at 
org.apache.solr.client.solrj.impl.CloudSolrServer.connect(CloudSolrServer.java:252)
at com.lucidworks.spark.SolrSupport.getSolrServer(SolrSupport.java:67)
at com.lucidworks.spark.SolrSupport$4.call(SolrSupport.java:162)
at com.lucidworks.spark.SolrSupport$4.call(SolrSupport.java:160)
at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for /live_nodes
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1468)
at 
org.apache.solr.common.cloud.SolrZkClient$7.execute(SolrZkClient.java:290)
at 
org.apache.solr.common.cloud.SolrZkClient$7.execute(SolrZkClient.java:287)
at 
org.apache.solr.common.cloud.ZkCmdExecutor.retryOperation(ZkCmdExecutor.java:74)
at 
org.apache.solr.common.cloud.SolrZkClient.getChildren(SolrZkClient.java:287)
at 
org.apache.solr.common.cloud.ZkStateReader.createClusterStateWatchersAndUpdate(ZkStateReader.java:334)
at 
org.apache.solr.client.solrj.impl.CloudSolrServer.connect(CloudSolrServer.java:243)


2015-12-16 16:26 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>:

> I'm trying to index document to Solr from Spark with the library solr-spark
>
> I have create a project with Maven and include all the dependencies when I
> execute spark but I get a ClassNotFoundException. I have check that the
> class is in one of the jar that I'm including ( solr-solrj-4.10.3.jar)
> I compiled the brach 4.X of this library because the last one it's to Solr
> 5.X
>
> Th error  seems pretty clear, but I have checked in the sparkUI spark.jars
> and the jars appear.
>
> 2015-12-16 16:13:40,265 [task-result-getter-3] WARN
>  org.apache.spark.ThrowableSerializationWrapper - Task exception could not
> be deserialized
> java.lang.*ClassNotFoundException:
> org.apache.solr.common.cloud.ZooKeeperException*
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:270)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
> at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcc

How to config the log in Spark

2015-12-07 Thread Guillermo Ortiz
I don't get to activate the logs for my classes. I'm using CDH 5.4 with
Spark 1.3.0

I have a class in Scala with some log.debug, I create a class to log:

package example.spark
import org.apache.log4j.Logger
object Holder extends Serializable {
  @transient lazy val log = Logger.getLogger(getClass.getName)
}

And I use the log inside of a map function which it's executed in the
executors. I'm looking for the logs in the executors (YARN).


My log4j.properties is
log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c -
%m%n

log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppender.File=/opt/centralLog/log/spark.log
log4j.appender.RollingAppender.DatePattern='.'-MM-dd
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n

log4j.rootLogger=INFO, myConsoleAppender, RollingAppender
*log4j.logger.example.spark=DEBUG, RollingAppender, myConsoleAppender*

And I created a script to execute Spark:
#!/bin/bash

export HADOOP_CONF_DIR=/etc/hadoop/conf
export SPARK_CONF_DIR=/opt/centralLogs/conf
SPARK_CLASSPATH="file:/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/"
for lib in `ls /opt/centralLogs/lib/*.jar`
do
if [ -z "$SPARK_CLASSPATH" ]; then
SPARK_CLASSPATH=$lib
else
SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
fi
done

spark-submit --name "CentralLog" --master yarn-client --conf
"spark.driver.extraJavaOptions=-*Dlog4j.configuration=log4j.properties"
--conf
"spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties"
--class example.spark.CentralLog* --jars $SPARK_CLASSPATH,
*file:/opt/centralLogs/conf/log4j.properties*  --executor-memory 2g
/opt/centralLogs/libProject/paas.jar X kafka-topic3 X,X,X

I added *file:/opt/centralLogs/conf/log4j.properties, *but it's not
working, I can't see the debug logs..


Re: Spark directStream with Kafka and process the lost messages.

2015-11-30 Thread Guillermo Ortiz
Then,,, something is wrong in my code ;), thanks.

2015-11-30 16:46 GMT+01:00 Cody Koeninger <c...@koeninger.org>:

> Starting from the checkpoint using getOrCreate should be sufficient if all
> you need is at-least-once semantics
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
>
> On Mon, Nov 30, 2015 at 9:38 AM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have Spark and Kafka with directStream. I'm trying that if Spark dies
>> it could process all those messages when it starts.  The offsets are stored
>> in chekpoints but I don't know how I could say to Spark to start in that
>> point.
>> I saw that there's another createDirectStream method with a fromOffsets
>> parameter but, how could I access to the offsets?
>>
>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>> ssc.checkpoint(checkpoint)
>> val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaBrokers, topic)
>>
>>
>


Spark directStream with Kafka and process the lost messages.

2015-11-30 Thread Guillermo Ortiz
Hello,

I have Spark and Kafka with directStream. I'm trying that if Spark dies it
could process all those messages when it starts.  The offsets are stored in
chekpoints but I don't know how I could say to Spark to start in that point.
I saw that there's another createDirectStream method with a fromOffsets
parameter but, how could I access to the offsets?

val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint(checkpoint)
val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaBrokers, topic)


Re: Problems with JobScheduler

2015-07-31 Thread Guillermo Ortiz
It doesn't make sense to me. Because in the another cluster process all
data in less than a second.
Anyway, I'm going to set that parameter.

2015-07-31 0:36 GMT+02:00 Tathagata Das t...@databricks.com:

 Yes, and that is indeed the problem. It is trying to process all the data
 in Kafka, and therefore taking 60 seconds. You need to set the rate limits
 for that.

 On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger c...@koeninger.org
 wrote:

 If you don't set it, there is no maximum rate, it will get everything
 from the end of the last batch to the maximum available offset

 On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 The difference is that one recives more data than the others two. I can
 pass thought parameters the topics, so, I could execute the code trying
 with one topic and figure out with one is the topic, although I guess that
 it's the topics which gets more data.

 Anyway it's pretty weird those delays in just one of the cluster even if
 the another one is not running.
 I have seen the parameter spark.streaming.kafka.maxRatePerPartition,
 I haven't set any value for this parameter, how does it work if this
 parameter doesn't have a value?

 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org:

 If the jobs are running on different topicpartitions, what's different
 about them?  Is one of them 120x the throughput of the other, for
 instance?  You should be able to eliminate cluster config as a difference
 by running the same topic partition on the different clusters and comparing
 the results.

 On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I have three topics with one partition each topic. So each jobs run
 about one topics.

 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is
 this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz 
 konstt2...@gmail.com wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain why 
 it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents.
 I tried to stop one cluster and execute just the cluster isn't working 
 but
 it happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same
 code in two cluster. I read from three topics in Kafka with 
 DirectStream so
 I have three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
 24.0 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
 24.0 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
 1438259625000 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
 143825963 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
 1438259635000 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
 24.0 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
 tasks have all completed

Re: Problems with JobScheduler

2015-07-31 Thread Guillermo Ortiz
I detected the error. The final step is to index data in ElasticSearch, The
elasticSearch in one of the cluster is overhelmed and it doesn't work
correctly.
I linked the cluster which doesn't work with another ES and don't get any
delay.

Sorry,  it wasn't relationed with Spark!




2015-07-31 9:15 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 It doesn't make sense to me. Because in the another cluster process all
 data in less than a second.
 Anyway, I'm going to set that parameter.

 2015-07-31 0:36 GMT+02:00 Tathagata Das t...@databricks.com:

 Yes, and that is indeed the problem. It is trying to process all the data
 in Kafka, and therefore taking 60 seconds. You need to set the rate limits
 for that.

 On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger c...@koeninger.org
 wrote:

 If you don't set it, there is no maximum rate, it will get everything
 from the end of the last batch to the maximum available offset

 On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 The difference is that one recives more data than the others two. I can
 pass thought parameters the topics, so, I could execute the code trying
 with one topic and figure out with one is the topic, although I guess that
 it's the topics which gets more data.

 Anyway it's pretty weird those delays in just one of the cluster even
 if the another one is not running.
 I have seen the parameter spark.streaming.kafka.maxRatePerPartition,
 I haven't set any value for this parameter, how does it work if this
 parameter doesn't have a value?

 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org:

 If the jobs are running on different topicpartitions, what's different
 about them?  Is one of them 120x the throughput of the other, for
 instance?  You should be able to eliminate cluster config as a difference
 by running the same topic partition on the different clusters and 
 comparing
 the results.

 On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
  wrote:

 I have three topics with one partition each topic. So each jobs run
 about one topics.

 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is
 this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz 
 konstt2...@gmail.com wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain 
 why it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents.
 I tried to stop one cluster and execute just the cluster isn't 
 working but
 it happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same
 code in two cluster. I read from three topics in Kafka with 
 DirectStream so
 I have three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added
 broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, 
 free:
 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added
 broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, 
 free:
 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
 24.0 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
 24.0 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
 1438259625000 ms*
 *15/07/30

Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
I have some problem with the JobScheduler. I have executed same code in two
cluster. I read from three topics in Kafka with DirectStream so I have
three tasks.

I have check YARN and there aren't more jobs launched.

The cluster where I have troubles I got this logs:

15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID
72, x, RACK_LOCAL, 14856 bytes)
15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID
73, xxx, RACK_LOCAL, 14852 bytes)
15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID
74, x, RACK_LOCAL, 14860 bytes)
15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID
72) in 208 ms on x (1/3)
15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID
74) in 49 ms on x (2/3)
*15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms*
*15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms*
*15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms*
*15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms*
*15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms*
*15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms*
*15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms*
*15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms*
*15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms*
*15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms*
*15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms*
*15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms*
15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID
73) in 60373 ms on (3/3)
15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
have all completed, from pool
15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
MetricsSpark.scala:67) finished in 60.379 s
15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
MetricsSpark.scala:67, took 60.391761 s
15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
143825821 ms.0 from job set of time 143825821 ms
15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
143825821 ms (execution: 60.399 s)
15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
1438258215000 ms.0 from job set of time 1438258215000 ms

There are *always *a minute of delay in the third task, when I have
executed same code in another cluster there isn't this delay in the
JobScheduler. I checked the configuration in YARN in both clusters and it
seems the same.

The log in the cluster is working good is

15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID
279, xx, RACK_LOCAL, 14643 bytes)
15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID
280, x, RACK_LOCAL, 14639 bytes)
15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0 (TID
281, xxx, RACK_LOCAL, 14647 bytes)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0 (TID
279) in 121 ms on  (1/3)
15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
memory on x:49886 (size: 1801.0 B, free: 530.3 MB)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0 (TID
281) in 261 ms on xx (2/3)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0 (TID
280) in 519 ms on x (3/3)
15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
MetricsSpark.scala:67) finished in 0.522 s
15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
have all completed, from pool
15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
MetricsSpark.scala:67, took 0.531323 s
15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
1438259855000 ms.0 from job set of time 1438259855000 ms
15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
1438259855000 ms (execution: 0.540 s)
15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list

Any clue about where I could take a look? Number of cpus in YARN is enough.
I executing YARN with same options (--master yarn-server with 1g of memory
in both)


Re: Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
I read about maxRatePerPartition parameter, I haven't set this parameter.
Could it be the problem?? Although this wouldn't explain why it doesn't
work in one of the clusters.

2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I tried
 to stop one cluster and execute just the cluster isn't working but it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code in
 two cluster. I read from three topics in Kafka with DirectStream so I have
 three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958
 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000
 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959
 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000
 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960
 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000
 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961
 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000
 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962
 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000
 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963
 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000
 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
 have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in another cluster there isn't this delay in the
 JobScheduler. I checked the configuration in YARN in both clusters and it
 seems the same.

 The log in the cluster is working good is

 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
 (TID 279, xx, RACK_LOCAL, 14643 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
 (TID 280, x, RACK_LOCAL, 14639 bytes)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
 (TID 281, xxx, RACK_LOCAL, 14647 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
 (TID 279) in 121 ms on  (1/3)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:49886 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
 (TID 281) in 261 ms on xx (2/3)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
 (TID 280) in 519 ms on x (3/3)
 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
 MetricsSpark.scala:67) finished in 0.522 s
 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
 have all completed, from pool
 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s

Re: Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
They just share the kafka, the rest of resources are independents. I tried
to stop one cluster and execute just the cluster isn't working but it
happens the same.

2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code in
 two cluster. I read from three topics in Kafka with DirectStream so I have
 three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
 have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in another cluster there isn't this delay in the
 JobScheduler. I checked the configuration in YARN in both clusters and it
 seems the same.

 The log in the cluster is working good is

 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
 (TID 279, xx, RACK_LOCAL, 14643 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
 (TID 280, x, RACK_LOCAL, 14639 bytes)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
 (TID 281, xxx, RACK_LOCAL, 14647 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
 (TID 279) in 121 ms on  (1/3)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:49886 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
 (TID 281) in 261 ms on xx (2/3)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
 (TID 280) in 519 ms on x (3/3)
 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
 MetricsSpark.scala:67) finished in 0.522 s
 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
 have all completed, from pool
 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s
 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
 1438259855000 ms.0 from job set of time 1438259855000 ms
 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
 1438259855000 ms (execution: 0.540 s)
 15/07/30 14:37:35 INFO

Re: Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
I have three topics with one partition each topic. So each jobs run about
one topics.

2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain why it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I
 tried to stop one cluster and execute just the cluster isn't working but it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code in
 two cluster. I read from three topics in Kafka with DirectStream so I have
 three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958
 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000
 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959
 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000
 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960
 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000
 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961
 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000
 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962
 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000
 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963
 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000
 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
 have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in another cluster there isn't this delay in the
 JobScheduler. I checked the configuration in YARN in both clusters and it
 seems the same.

 The log in the cluster is working good is

 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
 (TID 279, xx, RACK_LOCAL, 14643 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
 (TID 280, x, RACK_LOCAL, 14639 bytes)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
 (TID 281, xxx, RACK_LOCAL, 14647 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
 (TID 279) in 121 ms on  (1/3)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:49886

Re: Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
The difference is that one recives more data than the others two. I can
pass thought parameters the topics, so, I could execute the code trying
with one topic and figure out with one is the topic, although I guess that
it's the topics which gets more data.

Anyway it's pretty weird those delays in just one of the cluster even if
the another one is not running.
I have seen the parameter spark.streaming.kafka.maxRatePerPartition, I
haven't set any value for this parameter, how does it work if this
parameter doesn't have a value?

2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org:

 If the jobs are running on different topicpartitions, what's different
 about them?  Is one of them 120x the throughput of the other, for
 instance?  You should be able to eliminate cluster config as a difference
 by running the same topic partition on the different clusters and comparing
 the results.

 On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I have three topics with one partition each topic. So each jobs run about
 one topics.

 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain why it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I
 tried to stop one cluster and execute just the cluster isn't working but 
 it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code
 in two cluster. I read from three topics in Kafka with DirectStream so I
 have three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
 24.0 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
 24.0 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
 1438259625000 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
 143825963 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
 1438259635000 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
 24.0 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
 tasks have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code

Error SparkStreaming after a while executing.

2015-07-30 Thread Guillermo Ortiz
I'm executing a job with Spark Streaming and got this error all times when
the job has been executing for a while (usually hours of days).

I have no idea why it's happening.

15/07/30 13:02:14 ERROR LiveListenerBus: Listener EventLoggingListener
threw an exception
java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at
org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:169)
at
org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:36)
at
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
at
org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
Caused by: java.io.IOException: Lease timeout of 0 seconds expired.
at
org.apache.hadoop.hdfs.DFSOutputStream.abort(DFSOutputStream.java:2192)
at
org.apache.hadoop.hdfs.DFSClient.closeAllFilesBeingWritten(DFSClient.java:935)
at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:889)
at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417)
at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442)
at
org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)
at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298)
at java.lang.Thread.run(Thread.java:745)
15/07/30 13:02:14 INFO SparkContext: Starting job: foreachRDD at
MetricsSpark.scala:67
15/07/30 13:02:14 INFO DAGScheduler: Got job 5050 (foreachRDD at
MetricsSpark.scala:67) with 3 output partitions (allowLocal=false)
15/07/30 13:02:14 INFO DAGScheduler: Final stage: Stage 5050(foreachRDD at
MetricsSpark.scala:67)

Sometimes this error happens, but it doesn't mean that Spark stops working
forever. Because it looks like after this error Spark works correctly some
iterations, but most of time just fails after this error producing this
error all the time.

The code just filters some records and index the record in
ElasticSearch.after adding some new fields.


Checkpoints in SparkStreaming

2015-07-28 Thread Guillermo Ortiz
I'm using SparkStreaming and I want to configure checkpoint to manage
fault-tolerance.
I've been reading the documentation. Is it necessary to create and
configure the InputDSStream in the getOrCreate function?

I checked the example in
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
and it looks like it does everything inside of the function. Should I put
all the logic of the application inside on it?? I think that that's not the
way...

If I just create the context I got an error:
Exception in thread main org.apache.spark.SparkException:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not
been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)


I'm not pretty good with Scala.. the code that I did
  def functionToCreateContext(): StreamingContext = {
val sparkConf = new SparkConf().setMaster(local[2]).setAppName(app)
val ssc = new StreamingContext(sparkConf, Seconds(5))   // new context

ssc.checkpoint(/tmp/spark/metricsCheckpoint)   // set checkpoint
directory
ssc
  }


val ssc = StreamingContext.getOrCreate(/tmp/spark/metricsCheckpoint,
functionToCreateContext _)
val kafkaParams = Map[String, String](metadata.broker.list - args(0))
val topics = args(1).split(\\,)
val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

directKafkaStream.foreachRDD { rdd = ...


Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
Hi,

I'm executing a SparkStreamig code with Kafka. IçThe code was working but
today I tried to execute the code again and I got an exception, I dn't know
what's it happening. right now , there are no jobs executions on YARN.
How could it fix it?

Exception in thread main org.apache.spark.SparkException: Yarn
application has already ended! It might have been killed or unable to
launch application master.
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
at org.apache.spark.SparkContext.init(SparkContext.scala:379)
at
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
at
org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:75)
at
com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
at
com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
*15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete Spark
local dirs*
java.lang.NullPointerException
at org.apache.spark.storage.DiskBlockManager.org
$apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at
org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
Exception in thread delete Spark local dirs java.lang.NullPointerException
at org.apache.spark.storage.DiskBlockManager.org
$apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at
org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)


Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
Well SPARK_CLASSPATH it's just a random name, the complete script is this:

export HADOOP_CONF_DIR=/etc/hadoop/conf
SPARK_CLASSPATH=file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/
for lib in `ls /usr/metrics/lib/*.jar`
do
if [ -z $SPARK_CLASSPATH ]; then
SPARK_CLASSPATH=$lib
else
SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
fi
done
spark-submit --name Metrics

I need to add all the jars as you know,, maybe it was a bad name
SPARK_CLASSPATH

The code doesn't have any stateful operation, yo I guess that it¡s okay
doesn't have checkpoint. I have executed hundres of times thiscode in VM
from Cloudera and never got this error.

2015-06-27 11:21 GMT+02:00 Tathagata Das t...@databricks.com:

 1. you need checkpointing mostly for recovering from driver failures, and
 in some cases also for some stateful operations.

 2. Could you try not using the SPARK_CLASSPATH environment variable.

 TD

 On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I don't have any checkpoint on my code. Really, I don't have to save any
 state. It's just a log processing of a PoC.
 I have been testing the code in a VM from Cloudera and I never got that
 error.. Not it's a real cluster.

 The command to execute Spark
 spark-submit --name PoC Logs --master yarn-client --class
 com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
 /usr/metrics/ex/metrics-spark.jar $1 $2 $3

 val sparkConf = new SparkConf()
 val ssc = new StreamingContext(sparkConf, Seconds(5))
 val kafkaParams = Map[String, String](metadata.broker.list -
 args(0))
 val topics = args(1).split(\\,)
 val directKafkaStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

 directKafkaStream.foreachRDD { rdd =
   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =
   .
}

 I understand that I just need a checkpoint if I need to recover the task
 it something goes wrong, right?


 2015-06-27 9:39 GMT+02:00 Tathagata Das t...@databricks.com:

 How are you trying to execute the code again? From checkpoints, or
 otherwise?
 Also cc'ed Hari who may have a better idea of YARN related issues.

 On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 Hi,

 I'm executing a SparkStreamig code with Kafka. IçThe code was working
 but today I tried to execute the code again and I got an exception, I dn't
 know what's it happening. right now , there are no jobs executions on YARN.
 How could it fix it?

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:379)
 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:75)
 at
 com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
 at
 com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete
 Spark local dirs*
 java.lang.NullPointerException
 at org.apache.spark.storage.DiskBlockManager.org
 $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139

Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
I'm checking the logs in YARN and I found this error as well

Application application_1434976209271_15614 failed 2 times due to AM
Container for appattempt_1434976209271_15614_02 exited with exitCode:
255


Diagnostics: Exception from container-launch.
Container id: container_1434976209271_15614_02_01
Exit code: 255
Stack trace: ExitCodeException exitCode=255:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at
org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Shell output: Requested user hdfs is not whitelisted and has id 496,which
is below the minimum allowed 1000
Container exited with a non-zero exit code 255
Failing this attempt. Failing the application.

2015-06-27 11:25 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 Well SPARK_CLASSPATH it's just a random name, the complete script is this:

 export HADOOP_CONF_DIR=/etc/hadoop/conf

 SPARK_CLASSPATH=file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/
 for lib in `ls /usr/metrics/lib/*.jar`
 do
 if [ -z $SPARK_CLASSPATH ]; then
 SPARK_CLASSPATH=$lib
 else
 SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
 fi
 done
 spark-submit --name Metrics

 I need to add all the jars as you know,, maybe it was a bad name
 SPARK_CLASSPATH

 The code doesn't have any stateful operation, yo I guess that it¡s okay
 doesn't have checkpoint. I have executed hundres of times thiscode in VM
 from Cloudera and never got this error.

 2015-06-27 11:21 GMT+02:00 Tathagata Das t...@databricks.com:

 1. you need checkpointing mostly for recovering from driver failures, and
 in some cases also for some stateful operations.

 2. Could you try not using the SPARK_CLASSPATH environment variable.

 TD

 On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I don't have any checkpoint on my code. Really, I don't have to save any
 state. It's just a log processing of a PoC.
 I have been testing the code in a VM from Cloudera and I never got that
 error.. Not it's a real cluster.

 The command to execute Spark
 spark-submit --name PoC Logs --master yarn-client --class
 com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
 /usr/metrics/ex/metrics-spark.jar $1 $2 $3

 val sparkConf = new SparkConf()
 val ssc = new StreamingContext(sparkConf, Seconds(5))
 val kafkaParams = Map[String, String](metadata.broker.list -
 args(0))
 val topics = args(1).split(\\,)
 val directKafkaStream = KafkaUtils.createDirectStream[String,
 String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

 directKafkaStream.foreachRDD { rdd =
   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =
   .
}

 I understand that I just need a checkpoint if I need to recover the task
 it something goes wrong, right?


 2015-06-27 9:39 GMT+02:00 Tathagata Das t...@databricks.com:

 How are you trying to execute the code again? From checkpoints, or
 otherwise?
 Also cc'ed Hari who may have a better idea of YARN related issues.

 On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz konstt2...@gmail.com
  wrote:

 Hi,

 I'm executing a SparkStreamig code with Kafka. IçThe code was working
 but today I tried to execute the code again and I got an exception, I dn't
 know what's it happening. right now , there are no jobs executions on 
 YARN.
 How could it fix it?

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:379)
 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:75

Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
I don't have any checkpoint on my code. Really, I don't have to save any
state. It's just a log processing of a PoC.
I have been testing the code in a VM from Cloudera and I never got that
error.. Not it's a real cluster.

The command to execute Spark
spark-submit --name PoC Logs --master yarn-client --class
com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
/usr/metrics/ex/metrics-spark.jar $1 $2 $3

val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String](metadata.broker.list - args(0))
val topics = args(1).split(\\,)
val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

directKafkaStream.foreachRDD { rdd =
  val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =
  .
   }

I understand that I just need a checkpoint if I need to recover the task it
something goes wrong, right?


2015-06-27 9:39 GMT+02:00 Tathagata Das t...@databricks.com:

 How are you trying to execute the code again? From checkpoints, or
 otherwise?
 Also cc'ed Hari who may have a better idea of YARN related issues.

 On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 Hi,

 I'm executing a SparkStreamig code with Kafka. IçThe code was working but
 today I tried to execute the code again and I got an exception, I dn't know
 what's it happening. right now , there are no jobs executions on YARN.
 How could it fix it?

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:379)
 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:75)
 at
 com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
 at
 com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete Spark
 local dirs*
 java.lang.NullPointerException
 at org.apache.spark.storage.DiskBlockManager.org
 $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
 Exception in thread delete Spark local dirs
 java.lang.NullPointerException
 at org.apache.spark.storage.DiskBlockManager.org
 $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)







Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
I changed the variable name and I got the same error.

2015-06-27 11:36 GMT+02:00 Tathagata Das t...@databricks.com:

 Well, though randomly chosen, SPARK_CLASSPATH is a recognized env variable
 that is picked up by spark-submit. That is what was used pre-Spark-1.0, but
 got deprecated after that. Mind renamign that variable and trying it out
 again? At least it will reduce one possible source of problem.

 TD

 On Sat, Jun 27, 2015 at 2:32 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I'm checking the logs in YARN and I found this error as well

 Application application_1434976209271_15614 failed 2 times due to AM
 Container for appattempt_1434976209271_15614_02 exited with exitCode:
 255


 Diagnostics: Exception from container-launch.
 Container id: container_1434976209271_15614_02_01
 Exit code: 255
 Stack trace: ExitCodeException exitCode=255:
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
 at
 org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)
 at
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
 at
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Shell output: Requested user hdfs is not whitelisted and has id 496,which
 is below the minimum allowed 1000
 Container exited with a non-zero exit code 255
 Failing this attempt. Failing the application.

 2015-06-27 11:25 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 Well SPARK_CLASSPATH it's just a random name, the complete script is
 this:

 export HADOOP_CONF_DIR=/etc/hadoop/conf

 SPARK_CLASSPATH=file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/
 for lib in `ls /usr/metrics/lib/*.jar`
 do
 if [ -z $SPARK_CLASSPATH ]; then
 SPARK_CLASSPATH=$lib
 else
 SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
 fi
 done
 spark-submit --name Metrics

 I need to add all the jars as you know,, maybe it was a bad name
 SPARK_CLASSPATH

 The code doesn't have any stateful operation, yo I guess that it¡s okay
 doesn't have checkpoint. I have executed hundres of times thiscode in VM
 from Cloudera and never got this error.

 2015-06-27 11:21 GMT+02:00 Tathagata Das t...@databricks.com:

 1. you need checkpointing mostly for recovering from driver failures,
 and in some cases also for some stateful operations.

 2. Could you try not using the SPARK_CLASSPATH environment variable.

 TD

 On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I don't have any checkpoint on my code. Really, I don't have to save
 any state. It's just a log processing of a PoC.
 I have been testing the code in a VM from Cloudera and I never got
 that error.. Not it's a real cluster.

 The command to execute Spark
 spark-submit --name PoC Logs --master yarn-client --class
 com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
 /usr/metrics/ex/metrics-spark.jar $1 $2 $3

 val sparkConf = new SparkConf()
 val ssc = new StreamingContext(sparkConf, Seconds(5))
 val kafkaParams = Map[String, String](metadata.broker.list -
 args(0))
 val topics = args(1).split(\\,)
 val directKafkaStream = KafkaUtils.createDirectStream[String,
 String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

 directKafkaStream.foreachRDD { rdd =
   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =
   .
}

 I understand that I just need a checkpoint if I need to recover the
 task it something goes wrong, right?


 2015-06-27 9:39 GMT+02:00 Tathagata Das t...@databricks.com:

 How are you trying to execute the code again? From checkpoints, or
 otherwise?
 Also cc'ed Hari who may have a better idea of YARN related issues.

 On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz 
 konstt2...@gmail.com wrote:

 Hi,

 I'm executing a SparkStreamig code with Kafka. IçThe code was
 working but today I tried to execute the code again and I got an 
 exception,
 I dn't know what's it happening. right now , there are no jobs 
 executions
 on YARN.
 How could it fix it?

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113

Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Guillermo Ortiz
Hi,

I'm trying to connect to two topics of Kafka with Spark with DirectStream
but I get an error. I don't know if there're any limitation to do it,
because when I just access to one topics everything if right.

*val ssc = new StreamingContext(sparkConf, Seconds(5))*
*val kafkaParams = Map[String, String](metadata.broker.list -
quickstart.cloudera:9092)*
*val setTopic1 = Set(topic1)*
*val setTopic2 = Set(topic2)*

*val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
*val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*


The error that I get is:
* 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
*15/05/22 13:12:40 ERROR OneForOneStrategy: *
*java.lang.NullPointerException*
* at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
* at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
* at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
* at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
* at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
* at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*


Are there any limitation to do it?


Re: Working with slides. How do I know how many times a RDD has been processed?

2015-05-19 Thread Guillermo Ortiz
I tried to insert an flag in the RDD, so I could set in the last position
a counter, when the counter gets X, I could do something. But in each slide
comes the original RDD although I modificated it.

I did this code to check if this is possible but it doesn't work.

val rdd1WithFlag = rdd1.map { register =
  var splitRegister = register._2.split(\\|)
  var newArray = new Array[String](splitRegister.length + 1)

  if (splitRegister.length == 2) {
splitRegister.copyToArray(newArray)
newArray(splitRegister.length) = 0
  } else {
splitRegister(splitRegister.length) = 1
splitRegister.copyToArray(newArray)
  }

  (splitRegister(1), newArray)
}

If I check the length of splitRegister  is always 2 in each slide, it is
never three.



2015-05-18 15:36 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 Hi,

 I have two streaming RDD1 and RDD2 and want to cogroup them.
 Data don't come in the same time and sometimes they could come with some
 delay.
 When I get all data I want to insert in MongoDB.

 For example, imagine that I get:
 RDD1 -- T 0
 RDD2 --T 0.5
 I do cogroup between them but I couldn't store in Mongo yet because it
 could come more data in the next windows/slide.
 RDD2' --T 1.5
 Another RDD2' comes, I only want to save in Mongo once. So, I should only
 save it when I get all data. What I know it's how long I should wait as
 much.

 Ideally, I would like to save in MongoDB in the last slide for each RDD
 when I know that there is not possible to get more RDD2 to join with RDD1.
 Is it possible? how?

 Maybe there is other way to resolve this problem, any idea?





Working with slides. How do I know how many times a RDD has been processed?

2015-05-18 Thread Guillermo Ortiz
Hi,

I have two streaming RDD1 and RDD2 and want to cogroup them.
Data don't come in the same time and sometimes they could come with some
delay.
When I get all data I want to insert in MongoDB.

For example, imagine that I get:
RDD1 -- T 0
RDD2 --T 0.5
I do cogroup between them but I couldn't store in Mongo yet because it
could come more data in the next windows/slide.
RDD2' --T 1.5
Another RDD2' comes, I only want to save in Mongo once. So, I should only
save it when I get all data. What I know it's how long I should wait as
much.

Ideally, I would like to save in MongoDB in the last slide for each RDD
when I know that there is not possible to get more RDD2 to join with RDD1.
Is it possible? how?

Maybe there is other way to resolve this problem, any idea?


How to separate messages of different topics.

2015-05-05 Thread Guillermo Ortiz
I want to read from many topics in Kafka and know from where each message
is coming (topic1, topic2 and so on).

 val kafkaParams = Map[String, String](metadata.broker.list -
myKafka:9092)
 val topics = Set(EntryLog, presOpManager)
 val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)

 Is there some way to separate the messages for topics with just one
directStream? or should I create different streamings for each topic?


Spark + Kakfa with directStream

2015-05-05 Thread Guillermo Ortiz
I'm tryting to execute the Hello World example with Spark + Kafka (
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala)
with createDirectStream and I get this error.


java.lang.NoSuchMethodError:
kafka.message.MessageAndMetadata.init(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

I have checked the jars and it's the kafka-2.10-0.8.2.jar in the classpath
with the MessageAndMetadata class. Even I navigated through eclipse and I
came to this class with the right parameters.

My pom.xml has these dependecies

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.3.1/version
!-- scopeprovided/scope --
exclusions
exclusion
groupIdorg.codehaus.jackson/groupId
artifactIdjackson-mapper-asl/artifactId
/exclusion
exclusion
groupIdorg.codehaus.jackson/groupId
artifactIdjackson-core-asl/artifactId
/exclusion
/exclusions
/dependency


dependency
groupIdorg.apache.kafka/groupId
artifactIdkafka_2.10/artifactId
version0.8.2.1/version
/dependency

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming-kafka_2.10/artifactId
version1.3.1/version
!-- scopeprovided/scope --
exclusions
exclusion
groupIdorg.codehaus.jackson/groupId
artifactIdjackson-mapper-asl/artifactId
/exclusion
exclusion
groupIdorg.codehaus.jackson/groupId
artifactIdjackson-core-asl/artifactId
/exclusion
/exclusions
/dependency
/dependencies

Does somebody know where it's the error?? Thanks.


Re: Spark + Kakfa with directStream

2015-05-05 Thread Guillermo Ortiz
Sorry, I had a duplicated kafka dependency with another older version in
another pom.xml

2015-05-05 14:46 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I'm tryting to execute the Hello World example with Spark + Kafka (
 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala)
 with createDirectStream and I get this error.


 java.lang.NoSuchMethodError:
 kafka.message.MessageAndMetadata.init(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
 at
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 I have checked the jars and it's the kafka-2.10-0.8.2.jar in the classpath
 with the MessageAndMetadata class. Even I navigated through eclipse and I
 came to this class with the right parameters.

 My pom.xml has these dependecies

 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-streaming_2.10/artifactId
 version1.3.1/version
 !-- scopeprovided/scope --
 exclusions
 exclusion
 groupIdorg.codehaus.jackson/groupId
 artifactIdjackson-mapper-asl/artifactId
 /exclusion
 exclusion
 groupIdorg.codehaus.jackson/groupId
 artifactIdjackson-core-asl/artifactId
 /exclusion
 /exclusions
 /dependency


 dependency
 groupIdorg.apache.kafka/groupId
 artifactIdkafka_2.10/artifactId
 version0.8.2.1/version
 /dependency

 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-streaming-kafka_2.10/artifactId
 version1.3.1/version
 !-- scopeprovided/scope --
 exclusions
 exclusion
 groupIdorg.codehaus.jackson/groupId
 artifactIdjackson-mapper-asl/artifactId
 /exclusion
 exclusion
 groupIdorg.codehaus.jackson/groupId
 artifactIdjackson-core-asl/artifactId
 /exclusion
 /exclusions
 /dependency
 /dependencies

 Does somebody know where it's the error?? Thanks.



SparkSQL, executing an OR

2015-03-03 Thread Guillermo Ortiz
I'm trying to execute a query with Spark.

(Example from the Spark Documentation)
val teenagers = people.where('age = 10).where('age = 19).select('name)

Is it possible to execute an OR with this syntax?
val teenagers = people.where('age = 10 'or 'age = 4).where('age =
19).select('name)

I have tried different ways and I didn't get it.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL, executing an OR

2015-03-03 Thread Guillermo Ortiz
thanks, it works.

2015-03-03 13:32 GMT+01:00 Cheng, Hao hao.ch...@intel.com:
 Using where('age =10  'age =4) instead.

 -Original Message-
 From: Guillermo Ortiz [mailto:konstt2...@gmail.com]
 Sent: Tuesday, March 3, 2015 5:14 PM
 To: user
 Subject: SparkSQL, executing an OR

 I'm trying to execute a query with Spark.

 (Example from the Spark Documentation)
 val teenagers = people.where('age = 10).where('age = 19).select('name)

 Is it possible to execute an OR with this syntax?
 val teenagers = people.where('age = 10 'or 'age = 4).where('age =
 19).select('name)

 I have tried different ways and I didn't get it.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
 commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Combiners in Spark

2015-03-02 Thread Guillermo Ortiz
Which is the equivalent function to Combiners of MapReduce in Spark?
I guess that it's combineByKey, but is combineByKey executed locally?
I understand than functions as reduceByKey or foldByKey aren't executed locally.

Reading the documentation looks like combineByKey is equivalent to
reduceByKey just that combineByKey you can specify an different output
than the input you have.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



CollectAsMap, Broadcasting.

2015-02-26 Thread Guillermo Ortiz
I have a question,

If I execute this code,

val users = sc.textFile(/tmp/users.log).map(x = x.split(,)).map(
v = (v(0), v(1)))
val contacts = sc.textFile(/tmp/contacts.log).map(y =
y.split(,)).map( v = (v(0), v(1)))
val usersMap = contacts.collectAsMap()
contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect()

When I execute collectAsMap, where is data? in each Executor?? I guess
than each executor has data that it proccesed. The result is sent to
the driver, but I guess that each executor keeps its piece of
processed data.

I guess that it's more efficient that to use a join in this case
because there's not shuffle but If I save usersMap as a broadcast
variable, wouldn't it be less efficient because I'm sending data to
executors and don't need it?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Guillermo Ortiz
So, on my example, when I execute:
val usersMap = contacts.collectAsMap() -- Map goes to the driver and
just lives there in the beginning.
contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect

When I execute usersMap(v._1),
Does driver has to send to the executorX the value which it needs? I
guess I'm missing something.
How does the data transfer among usersMap(just in the driver) and
executors work?

On this case it looks like better to use broadcasting like:
val usersMap = contacts.collectAsMap()
val bc = sc.broadcast(usersMap)
contacts.map(v = (v._1, (bc.value(v._1), v._2))).collect()

2015-02-26 11:16 GMT+01:00 Sean Owen so...@cloudera.com:
 No, it exists only on the driver, not the executors. Executors don't
 retain partitions unless they are supposed to be persisted.

 Generally, broadcasting a small Map to accomplish a join 'manually' is
 more efficient than a join, but you are right that this is mostly
 because joins usually involve shuffles. If not, it's not as clear
 which way is best. I suppose that if the Map is large-ish, it's safer
 to not keep pulling it to the driver.

 On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz konstt2...@gmail.com 
 wrote:
 I have a question,

 If I execute this code,

 val users = sc.textFile(/tmp/users.log).map(x = x.split(,)).map(
 v = (v(0), v(1)))
 val contacts = sc.textFile(/tmp/contacts.log).map(y =
 y.split(,)).map( v = (v(0), v(1)))
 val usersMap = contacts.collectAsMap()
 contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect()

 When I execute collectAsMap, where is data? in each Executor?? I guess
 than each executor has data that it proccesed. The result is sent to
 the driver, but I guess that each executor keeps its piece of
 processed data.

 I guess that it's more efficient that to use a join in this case
 because there's not shuffle but If I save usersMap as a broadcast
 variable, wouldn't it be less efficient because I'm sending data to
 executors and don't need it?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Guillermo Ortiz
Isn't it contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect()
executed in the executors?  why is it executed in the driver?
contacts are not a local object, right?


2015-02-26 11:27 GMT+01:00 Sean Owen so...@cloudera.com:
 No. That code is just Scala code executing on the driver. usersMap is
 a local object. This bit has nothing to do with Spark.

 Yes you would have to broadcast it to use it efficient in functions
 (not on the driver).

 On Thu, Feb 26, 2015 at 10:24 AM, Guillermo Ortiz konstt2...@gmail.com 
 wrote:
 So, on my example, when I execute:
 val usersMap = contacts.collectAsMap() -- Map goes to the driver and
 just lives there in the beginning.
 contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect

 When I execute usersMap(v._1),
 Does driver has to send to the executorX the value which it needs? I
 guess I'm missing something.
 How does the data transfer among usersMap(just in the driver) and
 executors work?

 On this case it looks like better to use broadcasting like:
 val usersMap = contacts.collectAsMap()
 val bc = sc.broadcast(usersMap)
 contacts.map(v = (v._1, (bc.value(v._1), v._2))).collect()

 2015-02-26 11:16 GMT+01:00 Sean Owen so...@cloudera.com:
 No, it exists only on the driver, not the executors. Executors don't
 retain partitions unless they are supposed to be persisted.

 Generally, broadcasting a small Map to accomplish a join 'manually' is
 more efficient than a join, but you are right that this is mostly
 because joins usually involve shuffles. If not, it's not as clear
 which way is best. I suppose that if the Map is large-ish, it's safer
 to not keep pulling it to the driver.

 On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz konstt2...@gmail.com 
 wrote:
 I have a question,

 If I execute this code,

 val users = sc.textFile(/tmp/users.log).map(x = x.split(,)).map(
 v = (v(0), v(1)))
 val contacts = sc.textFile(/tmp/contacts.log).map(y =
 y.split(,)).map( v = (v(0), v(1)))
 val usersMap = contacts.collectAsMap()
 contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect()

 When I execute collectAsMap, where is data? in each Executor?? I guess
 than each executor has data that it proccesed. The result is sent to
 the driver, but I guess that each executor keeps its piece of
 processed data.

 I guess that it's more efficient that to use a join in this case
 because there's not shuffle but If I save usersMap as a broadcast
 variable, wouldn't it be less efficient because I'm sending data to
 executors and don't need it?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Guillermo Ortiz
One last time to be sure I got it right, the executing sequence here
goes like this?:

val usersMap = contacts.collectAsMap()
#The contacts RDD is collected by the executors and sent to the
driver, the executors delete the rdd
contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect()
#The userMap object is sent again to the executors to run the code,
and with the collect(), the result is sent again back to the driver


2015-02-26 11:57 GMT+01:00 Sean Owen so...@cloudera.com:
 Yes, in that code, usersMap has been serialized to every executor.
 I thought you were referring to accessing the copy in the driver.

 On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz konstt2...@gmail.com 
 wrote:
 Isn't it contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect()
 executed in the executors?  why is it executed in the driver?
 contacts are not a local object, right?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How Broadcast variable scale?.

2015-02-23 Thread Guillermo Ortiz
Thanks, I'll read that paper. We haven't tried with a cluster so big,
but it's suppose we should in the future and I was worried about it.
I'll comment something if you finally do, but it's not going to be
tomorrow :)

2015-02-23 17:38 GMT+01:00 Mosharaf Chowdhury mosharafka...@gmail.com:
 Hi Guillermo,

 The current broadcast algorithm in Spark approximates the one described in
 the Section 5 of this paper.
 It is expected to scale sub-linearly; i.e., O(log N), where N is the number
 of machines in your cluster.
 We evaluated up to 100 machines, and it does follow O(log N) scaling.

 Have you tried it on your 300-machine cluster? I'm curious to know what
 happened.

 -Mosharaf

 On Mon, Feb 23, 2015 at 8:06 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I'm looking for about how scale broadcast variables in Spark and what
 algorithm uses.

 I have found
 http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
 I don't know if they're talking about the current version (1.2.1)
 because the file was created in 2010.
 I took a look to the documentation and API and I read that there is an
 TorrentFactory for broadcast variable
  it's which it uses Spark right now? In the article they talk that
 Spark uses another one (Centralized HDFS Broadcast)

 How does it scale if I have a big cluster (about 300 nodes) the
 current algorithm?? is it linear? are there others options to choose
 others algorithms?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How Broadcast variable scale?.

2015-02-23 Thread Guillermo Ortiz
I'm looking for about how scale broadcast variables in Spark and what
algorithm uses.

I have found 
http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
I don't know if they're talking about the current version (1.2.1)
because the file was created in 2010.
I took a look to the documentation and API and I read that there is an
TorrentFactory for broadcast variable
 it's which it uses Spark right now? In the article they talk that
Spark uses another one (Centralized HDFS Broadcast)

How does it scale if I have a big cluster (about 300 nodes) the
current algorithm?? is it linear? are there others options to choose
others algorithms?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems with GC and time to execute with different number of executors.

2015-02-06 Thread Guillermo Ortiz
This is an execution with 80 executors

MetricMin25th percentileMedian75th percentileMax
Duration 31s 44s 50s 1.1min 2.6 min
GC Time 70ms 0.1s 0.3s 4s 53 s
Input 128.0MB 128.0MB 128.0MB 128.0MB 128.0MB

I executed as well with 40 executors
MetricMin25th percentileMedian75th percentileMax
Duration 26s 28s 28s 30s 35s
GC Time 54ms 60ms 66ms 80ms 0.4 s
Input 128.0MB 128.0MB 128.0MB 128.0MB 128.0 MB

I checked the %iowait and %steal in a worker it's all right in both of them
I understand the value of yarn.nodemanager.resource.memory-mb is for
each worker in the cluster and not the total value for YARN. it's
configured at 196GB right now. (I have 5 workers)
80executors x 4Gb = 320Gb, it shouldn't be a problem.


2015-02-06 10:03 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
 Yes, having many more cores than disks and all writing at the same time can
 definitely cause performance issues.  Though that wouldn't explain the high
 GC.  What percent of task time does the web UI report that tasks are
 spending in GC?

 On Fri, Feb 6, 2015 at 12:56 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 Yes, It's surpressing to me as well

 I tried to execute it with different configurations,

 sudo -u hdfs spark-submit  --master yarn-client --class
 com.mycompany.app.App --num-executors 40 --executor-memory 4g
 Example-1.0-SNAPSHOT.jar hdfs://ip:8020/tmp/sparkTest/ file22.bin
 parameters

 This is what I executed with different values in num-executors and
 executor-memory.
 What do you think there are too many executors for those HDDs? Could
 it be the reason because of each executor takes more time?

 2015-02-06 9:36 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
  That's definitely surprising to me that you would be hitting a lot of GC
  for
  this scenario.  Are you setting --executor-cores and --executor-memory?
  What are you setting them to?
 
  -Sandy
 
  On Thu, Feb 5, 2015 at 10:17 AM, Guillermo Ortiz konstt2...@gmail.com
  wrote:
 
  Any idea why if I use more containers I get a lot of stopped because
  GC?
 
  2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com:
   I'm not caching the data. with each iteration I mean,, each 128mb
   that a executor has to process.
  
   The code is pretty simple.
  
   final Conversor c = new Conversor(null, null, null,
   longFields,typeFields);
   SparkConf conf = new SparkConf().setAppName(Simple Application);
   JavaSparkContext sc = new JavaSparkContext(conf);
   JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock());
  
JavaRDDString rddString = rdd.map(new Functionbyte[], String() {
@Override
 public String call(byte[] arg0) throws Exception {
String result = c.parse(arg0).toString();
 return result;
   }
});
   rddString.saveAsTextFile(url + /output/ +
   System.currentTimeMillis()+
   /);
  
   The parse function just takes an array of bytes and applies some
   transformations like,,,
   [0..3] an integer, [4...20] an String, [21..27] another String and so
   on.
  
   It's just a test code, I'd like to understand what it's happeing.
  
   2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
   Hi Guillermo,
  
   What exactly do you mean by each iteration?  Are you caching data
   in
   memory?
  
   -Sandy
  
   On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz
   konstt2...@gmail.com
   wrote:
  
   I execute a job in Spark where I'm processing a file of 80Gb in
   HDFS.
   I have 5 slaves:
   (32cores /256Gb / 7physical disks) x 5
  
   I have been trying many different configurations with YARN.
   yarn.nodemanager.resource.memory-mb 196Gb
   yarn.nodemanager.resource.cpu-vcores 24
  
   I have tried to execute the job with different number of executors
   a
   memory (1-4g)
   With 20 executors takes 25s each iteration (128mb) and it never has
   a
   really long time waiting because GC.
  
   When I execute around 60 executors the process time it's about 45s
   and
   some tasks take until one minute because GC.
  
   I have no idea why it's calling GC when I execute more executors
   simultaneously.
   The another question it's why it takes more time to execute each
   block. My theory about the this it's because there're only 7
   physical
   disks and it's not the same 5 processes writing than 20.
  
   The code is pretty simple, it's just a map function which parse a
   line
   and write the output in HDFS. There're a lot of substrings inside
   of
   the function what it could cause GC.
  
   Any theory about?
  
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
  
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems with GC and time to execute with different number of executors.

2015-02-06 Thread Guillermo Ortiz
Yes, It's surpressing to me as well

I tried to execute it with different configurations,

sudo -u hdfs spark-submit  --master yarn-client --class
com.mycompany.app.App --num-executors 40 --executor-memory 4g
Example-1.0-SNAPSHOT.jar hdfs://ip:8020/tmp/sparkTest/ file22.bin
parameters

This is what I executed with different values in num-executors and
executor-memory.
What do you think there are too many executors for those HDDs? Could
it be the reason because of each executor takes more time?

2015-02-06 9:36 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
 That's definitely surprising to me that you would be hitting a lot of GC for
 this scenario.  Are you setting --executor-cores and --executor-memory?
 What are you setting them to?

 -Sandy

 On Thu, Feb 5, 2015 at 10:17 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 Any idea why if I use more containers I get a lot of stopped because GC?

 2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com:
  I'm not caching the data. with each iteration I mean,, each 128mb
  that a executor has to process.
 
  The code is pretty simple.
 
  final Conversor c = new Conversor(null, null, null,
  longFields,typeFields);
  SparkConf conf = new SparkConf().setAppName(Simple Application);
  JavaSparkContext sc = new JavaSparkContext(conf);
  JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock());
 
   JavaRDDString rddString = rdd.map(new Functionbyte[], String() {
   @Override
public String call(byte[] arg0) throws Exception {
   String result = c.parse(arg0).toString();
return result;
  }
   });
  rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+
  /);
 
  The parse function just takes an array of bytes and applies some
  transformations like,,,
  [0..3] an integer, [4...20] an String, [21..27] another String and so
  on.
 
  It's just a test code, I'd like to understand what it's happeing.
 
  2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
  Hi Guillermo,
 
  What exactly do you mean by each iteration?  Are you caching data in
  memory?
 
  -Sandy
 
  On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com
  wrote:
 
  I execute a job in Spark where I'm processing a file of 80Gb in HDFS.
  I have 5 slaves:
  (32cores /256Gb / 7physical disks) x 5
 
  I have been trying many different configurations with YARN.
  yarn.nodemanager.resource.memory-mb 196Gb
  yarn.nodemanager.resource.cpu-vcores 24
 
  I have tried to execute the job with different number of executors a
  memory (1-4g)
  With 20 executors takes 25s each iteration (128mb) and it never has a
  really long time waiting because GC.
 
  When I execute around 60 executors the process time it's about 45s and
  some tasks take until one minute because GC.
 
  I have no idea why it's calling GC when I execute more executors
  simultaneously.
  The another question it's why it takes more time to execute each
  block. My theory about the this it's because there're only 7 physical
  disks and it's not the same 5 processes writing than 20.
 
  The code is pretty simple, it's just a map function which parse a line
  and write the output in HDFS. There're a lot of substrings inside of
  the function what it could cause GC.
 
  Any theory about?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems with GC and time to execute with different number of executors.

2015-02-05 Thread Guillermo Ortiz
I'm not caching the data. with each iteration I mean,, each 128mb
that a executor has to process.

The code is pretty simple.

final Conversor c = new Conversor(null, null, null, longFields,typeFields);
SparkConf conf = new SparkConf().setAppName(Simple Application);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock());

 JavaRDDString rddString = rdd.map(new Functionbyte[], String() {
 @Override
  public String call(byte[] arg0) throws Exception {
 String result = c.parse(arg0).toString();
  return result;
}
 });
rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+ /);

The parse function just takes an array of bytes and applies some
transformations like,,,
[0..3] an integer, [4...20] an String, [21..27] another String and so on.

It's just a test code, I'd like to understand what it's happeing.

2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
 Hi Guillermo,

 What exactly do you mean by each iteration?  Are you caching data in
 memory?

 -Sandy

 On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I execute a job in Spark where I'm processing a file of 80Gb in HDFS.
 I have 5 slaves:
 (32cores /256Gb / 7physical disks) x 5

 I have been trying many different configurations with YARN.
 yarn.nodemanager.resource.memory-mb 196Gb
 yarn.nodemanager.resource.cpu-vcores 24

 I have tried to execute the job with different number of executors a
 memory (1-4g)
 With 20 executors takes 25s each iteration (128mb) and it never has a
 really long time waiting because GC.

 When I execute around 60 executors the process time it's about 45s and
 some tasks take until one minute because GC.

 I have no idea why it's calling GC when I execute more executors
 simultaneously.
 The another question it's why it takes more time to execute each
 block. My theory about the this it's because there're only 7 physical
 disks and it's not the same 5 processes writing than 20.

 The code is pretty simple, it's just a map function which parse a line
 and write the output in HDFS. There're a lot of substrings inside of
 the function what it could cause GC.

 Any theory about?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems with GC and time to execute with different number of executors.

2015-02-05 Thread Guillermo Ortiz
Any idea why if I use more containers I get a lot of stopped because GC?

2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com:
 I'm not caching the data. with each iteration I mean,, each 128mb
 that a executor has to process.

 The code is pretty simple.

 final Conversor c = new Conversor(null, null, null, longFields,typeFields);
 SparkConf conf = new SparkConf().setAppName(Simple Application);
 JavaSparkContext sc = new JavaSparkContext(conf);
 JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock());

  JavaRDDString rddString = rdd.map(new Functionbyte[], String() {
  @Override
   public String call(byte[] arg0) throws Exception {
  String result = c.parse(arg0).toString();
   return result;
 }
  });
 rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+ /);

 The parse function just takes an array of bytes and applies some
 transformations like,,,
 [0..3] an integer, [4...20] an String, [21..27] another String and so on.

 It's just a test code, I'd like to understand what it's happeing.

 2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com:
 Hi Guillermo,

 What exactly do you mean by each iteration?  Are you caching data in
 memory?

 -Sandy

 On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I execute a job in Spark where I'm processing a file of 80Gb in HDFS.
 I have 5 slaves:
 (32cores /256Gb / 7physical disks) x 5

 I have been trying many different configurations with YARN.
 yarn.nodemanager.resource.memory-mb 196Gb
 yarn.nodemanager.resource.cpu-vcores 24

 I have tried to execute the job with different number of executors a
 memory (1-4g)
 With 20 executors takes 25s each iteration (128mb) and it never has a
 really long time waiting because GC.

 When I execute around 60 executors the process time it's about 45s and
 some tasks take until one minute because GC.

 I have no idea why it's calling GC when I execute more executors
 simultaneously.
 The another question it's why it takes more time to execute each
 block. My theory about the this it's because there're only 7 physical
 disks and it's not the same 5 processes writing than 20.

 The code is pretty simple, it's just a map function which parse a line
 and write the output in HDFS. There're a lot of substrings inside of
 the function what it could cause GC.

 Any theory about?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problems with GC and time to execute with different number of executors.

2015-02-04 Thread Guillermo Ortiz
I execute a job in Spark where I'm processing a file of 80Gb in HDFS.
I have 5 slaves:
(32cores /256Gb / 7physical disks) x 5

I have been trying many different configurations with YARN.
yarn.nodemanager.resource.memory-mb 196Gb
yarn.nodemanager.resource.cpu-vcores 24

I have tried to execute the job with different number of executors a
memory (1-4g)
With 20 executors takes 25s each iteration (128mb) and it never has a
really long time waiting because GC.

When I execute around 60 executors the process time it's about 45s and
some tasks take until one minute because GC.

I have no idea why it's calling GC when I execute more executors simultaneously.
The another question it's why it takes more time to execute each
block. My theory about the this it's because there're only 7 physical
disks and it's not the same 5 processes writing than 20.

The code is pretty simple, it's just a map function which parse a line
and write the output in HDFS. There're a lot of substrings inside of
the function what it could cause GC.

Any theory about?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Define size partitions

2015-01-30 Thread Guillermo Ortiz
Hi,

I want to process some files, there're a king of big, dozens of
gigabytes each one. I get them like a array of bytes and there's an
structure inside of them.

I have a header which describes the structure. It could be like:
Number(8bytes) Char(16bytes) Number(4 bytes) Char(1bytes), ..
This structure appears N times on the file.

So, I could know the size of each block since it's fix. There's not
separator among block and block.

If I would do this with MapReduce, I could implement a new
RecordReader and InputFormat  to read each block because I know the
size of them and I'd fix the split size in the driver. (blockX1000 for
example). On this way, I could know that each split for each mapper
has complete blocks and there isn't a piece of the last block in the
next split.

Spark works with RDD and partitions, How could I resize  each
partition to do that?? is it possible? I guess that Spark doesn't use
the RecordReader and these classes for these tasks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Trying to execute Spark in Yarn

2015-01-08 Thread Guillermo Ortiz
I'm trying to execute Spark from a Hadoop Cluster, I have created this
script to try it:

#!/bin/bash

export HADOOP_CONF_DIR=/etc/hadoop/conf
SPARK_CLASSPATH=
for lib in `ls /user/local/etc/lib/*.jar`
do
SPARK_CLASSPATH=$SPARK_CLASSPATH:$lib
done
/home/spark-1.1.1-bin-hadoop2.4/bin/spark-submit --name Streaming
--master yarn-cluster --class com.sparkstreaming.Executor --jars
$SPARK_CLASSPATH --executor-memory 10g
/user/local/etc/lib/my-spark-streaming-scala.jar

When I execute the script I get this error:

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Exception in thread main java.net.URISyntaxException: Expected
scheme name at index 0:
:/user/local/etc/lib/akka-actor_2.10-2.2.3-shaded-protobuf.jar:/user/local/etc/lib/akka-remote_2.10-..


-maths-1.2.2a.jar:/user/local/etc/lib/xmlenc-0.52.jar:/user/local/etc/lib/zkclient-0.3.jar:/user/local/etc/lib/zookeeper-3.4.5.jar
at java.net.URI$Parser.fail(URI.java:2829)
at java.net.URI$Parser.failExpecting(URI.java:2835)
at java.net.URI$Parser.parse(URI.java:3027)
at java.net.URI.init(URI.java:595)
at org.apache.spark.util.Utils$.resolveURI(Utils.scala:1396)
at 
org.apache.spark.util.Utils$$anonfun$resolveURIs$1.apply(Utils.scala:1419)
at 
org.apache.spark.util.Utils$$anonfun$resolveURIs$1.apply(Utils.scala:1419)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.util.Utils$.resolveURIs(Utils.scala:1419)
at 
org.apache.spark.deploy.SparkSubmitArguments.parse$1(SparkSubmitArguments.scala:308)
at 
org.apache.spark.deploy.SparkSubmitArguments.parseOpts(SparkSubmitArguments.scala:221)
at 
org.apache.spark.deploy.SparkSubmitArguments.init(SparkSubmitArguments.scala:65)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:70)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



Why do I get this error? I have no idea. Any clue?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Trying to execute Spark in Yarn

2015-01-08 Thread Guillermo Ortiz
thanks!

2015-01-08 12:59 GMT+01:00 Shixiong Zhu zsxw...@gmail.com:
 `--jars` accepts a comma-separated list of jars. See the usage about
 `--jars`

 --jars JARS Comma-separated list of local jars to include on the driver and
 executor classpaths.



 Best Regards,

 Shixiong Zhu

 2015-01-08 19:23 GMT+08:00 Guillermo Ortiz konstt2...@gmail.com:

 I'm trying to execute Spark from a Hadoop Cluster, I have created this
 script to try it:

 #!/bin/bash

 export HADOOP_CONF_DIR=/etc/hadoop/conf
 SPARK_CLASSPATH=
 for lib in `ls /user/local/etc/lib/*.jar`
 do
 SPARK_CLASSPATH=$SPARK_CLASSPATH:$lib
 done
 /home/spark-1.1.1-bin-hadoop2.4/bin/spark-submit --name Streaming
 --master yarn-cluster --class com.sparkstreaming.Executor --jars
 $SPARK_CLASSPATH --executor-memory 10g
 /user/local/etc/lib/my-spark-streaming-scala.jar

 When I execute the script I get this error:

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Exception in thread main java.net.URISyntaxException: Expected
 scheme name at index 0:

 :/user/local/etc/lib/akka-actor_2.10-2.2.3-shaded-protobuf.jar:/user/local/etc/lib/akka-remote_2.10-..
 
 

 -maths-1.2.2a.jar:/user/local/etc/lib/xmlenc-0.52.jar:/user/local/etc/lib/zkclient-0.3.jar:/user/local/etc/lib/zookeeper-3.4.5.jar
 at java.net.URI$Parser.fail(URI.java:2829)
 at java.net.URI$Parser.failExpecting(URI.java:2835)
 at java.net.URI$Parser.parse(URI.java:3027)
 at java.net.URI.init(URI.java:595)
 at org.apache.spark.util.Utils$.resolveURI(Utils.scala:1396)
 at
 org.apache.spark.util.Utils$$anonfun$resolveURIs$1.apply(Utils.scala:1419)
 at
 org.apache.spark.util.Utils$$anonfun$resolveURIs$1.apply(Utils.scala:1419)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.resolveURIs(Utils.scala:1419)
 at
 org.apache.spark.deploy.SparkSubmitArguments.parse$1(SparkSubmitArguments.scala:308)
 at
 org.apache.spark.deploy.SparkSubmitArguments.parseOpts(SparkSubmitArguments.scala:221)
 at
 org.apache.spark.deploy.SparkSubmitArguments.init(SparkSubmitArguments.scala:65)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:70)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



 Why do I get this error? I have no idea. Any clue?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >