Re: Spark S3

2016-10-10 Thread Selvam Raman
I mentioned parquet as input format.
On Oct 10, 2016 11:06 PM, "ayan guha"  wrote:

> It really depends on the input format used.
> On 11 Oct 2016 08:46, "Selvam Raman"  wrote:
>
>> Hi,
>>
>> How spark reads data from s3 and runs parallel task.
>>
>> Assume I have a s3 bucket size of 35 GB( parquet file).
>>
>> How the sparksession will read the data and process the data parallel.
>> How it splits the s3 data and assign to each executor task.
>>
>> ​Please share me your points.
>>
>> Note:
>> if we have RDD , then we can look at the partitions.size or length to
>> check how many partition for a file. But how this will be accomplished in
>> terms of S3 bucket.​
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>


GraphFrame BFS

2016-10-10 Thread cashinpj
Hello,

I have a set of data representing various network connections.  Each vertex
is represented by a single id, while the edges have  a source id,
destination id, and a relationship (peer to peer, customer to provider, or
provider to customer).  I am trying to create a sub graph build around a
single source node following one type of edge as far as possible. 

For example:
1 2 p2p
2 3 p2p
2 3 c2p

Following the p2p edges would give:

1 2 p2p
2 3 p2p

I am pretty new to GraphX and GraphFrames, but was wondering if it is
possible to get this behavior using the GraphFrames bfs() function or would
it be better to modify the already existing Pregel implementation of bfs?

Thank you for your time.

Padraic



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphFrame-BFS-tp27876.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kryo on Zeppelin

2016-10-10 Thread Fei Hu
Hi All,

I am running some spark scala code on zeppelin on CDH 5.5.1 (Spark version
1.5.0). I customized the Spark interpreter to use org.apache.spark.
serializer.KryoSerializer as spark.serializer. And in the dependency I
added Kyro-3.0.3 as following:
 com.esotericsoftware:kryo:3.0.3


When I wrote the scala notebook and run the program, I got the following
errors. But If I compiled these code as jars, and use spark-submit to run
it on the cluster, it worked well without errors.

WARN [2016-10-10 23:43:40,801] ({task-result-getter-1}
Logging.scala[logWarning]:71) - Lost task 0.0 in stage 3.0 (TID 9,
svr-A3-A-U20): java.io.EOFException

at org.apache.spark.serializer.KryoDeserializationStream.
readObject(KryoSerializer.scala:196)

at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(
TorrentBroadcast.scala:217)

at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$
readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)

at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(
TorrentBroadcast.scala:165)

at org.apache.spark.broadcast.TorrentBroadcast._value$
lzycompute(TorrentBroadcast.scala:64)

at org.apache.spark.broadcast.TorrentBroadcast._value(
TorrentBroadcast.scala:64)

at org.apache.spark.broadcast.TorrentBroadcast.getValue(
TorrentBroadcast.scala:88)

at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:214)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


There were also some errors when I run the Zeppelin Tutorial:

Caused by: java.io.IOException: java.lang.NullPointerException

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

at org.apache.spark.rdd.ParallelCollectionPartition.readObject(
ParallelCollectionRDD.scala:70)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(
ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(
ObjectInputStream.java:1900)

at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1351)

at java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(
ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:72)

at org.apache.spark.serializer.JavaSerializerInstance.
deserialize(JavaSerializer.scala:98)

at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:194)

... 3 more

Caused by: java.lang.NullPointerException

at com.twitter.chill.WrappedArraySerializer.read(
WrappedArraySerializer.scala:38)

at com.twitter.chill.WrappedArraySerializer.read(
WrappedArraySerializer.scala:23)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at org.apache.spark.serializer.KryoDeserializationStream.
readObject(KryoSerializer.scala:192)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(
ParallelCollectionRDD.scala:80)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(
ParallelCollectionRDD.scala:80)

at org.apache.spark.util.Utils$.deserializeViaNestedStream(
Utils.scala:142)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)

Is there anyone knowing why it happended?

Thanks in advance,
Fei


[no subject]

2016-10-10 Thread Fei Hu
Hi All,

I am running some spark scala code on zeppelin on CDH 5.5.1 (Spark version
1.5.0). I customized the Spark interpreter to use
org.apache.spark.serializer.KryoSerializer as spark.serializer. And in the
dependency I added Kyro-3.0.3 as following:
 com.esotericsoftware:kryo:3.0.3


When I wrote the scala notebook and run the program, I got the following
errors. But If I compiled these code as jars, and use spark-submit to run
it on the cluster, it worked well without errors.

WARN [2016-10-10 23:43:40,801] ({task-result-getter-1}
Logging.scala[logWarning]:71) - Lost task 0.0 in stage 3.0 (TID 9,
svr-A3-A-U20): java.io.EOFException

at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:196)

at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)

at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)

at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)

at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)

at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)

at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)

at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


There were also some errors when I run the Zeppelin Tutorial:

Caused by: java.io.IOException: java.lang.NullPointerException

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)

at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)

at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)

... 3 more

Caused by: java.lang.NullPointerException

at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:38)

at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)

at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80)

at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80)

at
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:142)

at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)

Is there anyone knowing why it happended?

Thanks in advance,
Fei


Re: [Spark] RDDs are not persisting in memory

2016-10-10 Thread Chin Wei Low
Hi,

Your RDD is 5GB, perhaps it is too large to fit into executor's storage
memory. You can refer to the Executors tab in Spark UI to check the
available memory for storage for each of the executor.

Regards,
Chin Wei

On Tue, Oct 11, 2016 at 6:14 AM, diplomatic Guru 
wrote:

> Hello team,
>
> Spark version: 1.6.0
>
> I'm trying to persist done data into memory for reusing them. However,
> when I call rdd.cache() OR  rdd.persist(StorageLevel.MEMORY_ONLY())  it
> does not store the data as I can not see any rdd information under WebUI
> (Storage Tab).
>
> Therefore I tried rdd.persist(StorageLevel.MEMORY_AND_DISK()), for which
> it stored the data into Disk only as shown in below screenshot:
>
> [image: Inline images 2]
>
> Do you know why the memory is not being used?
>
> Is there a configuration in cluster level to stop jobs from storing data
> into memory altogether?
>
>
> Please let me know.
>
> Thanks
>
> Guru
>
>


Re: JSON Arrays and Spark

2016-10-10 Thread Hyukjin Kwon
FYI, it supports

[{...}, {...} ...]

Or

{...}

format as input.

On 11 Oct 2016 3:19 a.m., "Jean Georges Perrin"  wrote:

> Thanks Luciano - I think this is my issue :(
>
> On Oct 10, 2016, at 2:08 PM, Luciano Resende  wrote:
>
> Please take a look at
> http://spark.apache.org/docs/latest/sql-programming-guide.
> html#json-datasets
>
> Particularly the note at the required format :
>
> Note that the file that is offered as *a json file* is not a typical JSON
> file. Each line must contain a separate, self-contained valid JSON object.
> As a consequence, a regular multi-line JSON file will most often fail.
>
>
>
> On Mon, Oct 10, 2016 at 9:57 AM, Jean Georges Perrin  wrote:
>
>> Hi folks,
>>
>> I am trying to parse JSON arrays and it’s getting a little crazy (for me
>> at least)…
>>
>> 1)
>> If my JSON is:
>> {"vals":[100,500,600,700,800,200,900,300]}
>>
>> I get:
>> ++
>> |vals|
>> ++
>> |[100, 500, 600, 7...|
>> ++
>>
>> root
>>  |-- vals: array (nullable = true)
>>  ||-- element: long (containsNull = true)
>>
>> and I am :)
>>
>> 2)
>> If my JSON is:
>> [100,500,600,700,800,200,900,300]
>>
>> I get:
>> ++
>> | _corrupt_record|
>> ++
>> |[100,500,600,700,...|
>> ++
>>
>> root
>>  |-- _corrupt_record: string (nullable = true)
>>
>> Both are legit JSON structures… Do you think that #2 is a bug?
>>
>> jg
>>
>>
>>
>>
>>
>>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>
>
>


Re: What happens when an executor crashes?

2016-10-10 Thread Cody Koeninger
Repartition almost always involves a shuffle.

Let me see if I can explain the recovery stuff...

Say you start with two kafka partitions, topic-0 and topic-1.

You shuffle those across 3 spark parittions, we'll label them A B and C.

Your job is has written

fileA: results for A, offset ranges for topic-0 offsets 10-60 and
topic-1 offsets 23 - 66
fileB: results for B, offset ranges for topic-0 offsets 10-60 and
topic-1 offsets 23 - 66

It's in the middle of writing for fileC when it dies.

On startup, you need to start the job from
topic-0 offsets 10-60 and topic-1 offsets 23 - 66
and do all the work in all the partitions (because of the shuffle, you
dont know which kafka partition the data came from)

You just don't overwrite fileA and fileB, because they already have
correct data and offsets.  You just write fileC.

Then once youve recovered you go on about your job as normal, starting
at topic-0 offsets 60, topic-1 offsets 66

Clear as mud?






On Mon, Oct 10, 2016 at 5:36 PM, Samy Dindane  wrote:
>
>
> On 10/10/2016 8:14 PM, Cody Koeninger wrote:
>>
>> Glad it was helpful :)
>>
>> As far as executors, my expectation is that if you have multiple
>> executors running, and one of them crashes, the failed task will be
>> submitted on a different executor.  That is typically what I observe
>> in spark apps, if that's not what you're seeing I'd try to get help on
>> that specific issue.
>>
>> As far as kafka offsets per-partition, you need to atomically write
>> your offsets and results in the same place.If that place is a
>> filesystem, you need to be using atomic operations (I try to stay away
>> from HDFS, but I believe renames are atomic, for instance).
>
> That's what I am trying to do.
>> If you're
>>
>> doing that in normal code, ie with an iterator instead of an rdd or
>> dataframe, you may have to do some of that work yourself.
>
> How would you do that without an iterator?
> As far as I've seen, to have granular control over partitions (and each
> partition's checkpoint), one needs to use `foreachPartition` or
> `mapPartitions`, thus dealing with an iterator instead of a RDD.
>>
>>
>> As far as using partitionBy, even after repartitioning you can still
>> use the general idea of writing results and offsets in the same place.
>
> I don't understand what you mean. What do you mean by "the same place"? Why
> would I want to do that?
>>
>> The major difference is that you need to write all offsets in each
>> partition (because things have been shuffled),
>
> Oh I didn't think of shuffling after partitioning.
> But does it matter if I use partitionBy()?
>
> I will make a code example once at the office so you can take a look.
>>
>> and need to be more
>> careful on startup after a failure.  On startup, you'd see which
>> partitions were incomplete, start the job from the offsets in the
>> incomplete partitions, do the work for all partitions,
>
> It's clear until here.
>>
>> but ignore the
>> writes when they got to the complete partitions.  I realize that's
>> kind of a complicated description, if it doesn't make sense ask, or I
>> may be able to put up some publicly visible code at some point in the
>> future.
>
> Yes I don't see what you mean. :)
>
> I really appreciate your help. Thanks a lot.
>
>>
>>
>> On Mon, Oct 10, 2016 at 12:12 PM, Samy Dindane  wrote:
>>>
>>> I just noticed that you're the author of the code I linked in my previous
>>> email. :) It's helpful.
>>>
>>> When using `foreachPartition` or `mapPartitions`, I noticed I can't ask
>>> Spark to write the data on the disk using `df.write()` but I need to use
>>> the
>>> iterator to do so, which means losing the ability of using partitionBy().
>>> Do you know a workaround? Or I'll be forced to partition data manually.
>>>
>>> I think I understand why the job crashes when a single executor does:
>>> `df.write()save()` writes all the partitions in the same time, which
>>> fails if one of them has died.
>>> Is that right?
>>>
>>> Thank you.
>>>
>>> Samy
>>>
>>>
>>> On 10/10/2016 04:58 PM, Samy Dindane wrote:


 Hi Cody,

 I am writing a spark job that reads records from a Kafka topic and
 writes
 them on the file system.
 This would be straightforward if it weren't for the custom checkpointing
 logic I want to have; Spark's checkpointing doesn't suit us as it
 doesn't
 permit code updates.

 The custom checkpointing would be similar to this:

 https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala
 I am trying to understand how this would work if an executor crashes, so
 I
 tried making one crash manually, but I noticed it kills the whole job
 instead of creating another executor to resume the task.
 Is that expected? Is there anything wrong with my approach?

 Thank you for your time.


 On 10/10/2016 04:29 PM, Cody Koeninger wrote:
>

Re: ClassCastException while running a simple wordCount

2016-10-10 Thread Jakob Odersky
Just thought of another potential issue: you should use the "provided"
scope when depending on spark. I.e in your project's pom:

org.apache.spark
spark-core_2.11
2.0.1
provided


On Mon, Oct 10, 2016 at 2:00 PM, Jakob Odersky  wrote:

> Ho do you submit the application? A version mismatch between the launcher,
> driver and workers could lead to the bug you're seeing. A common reason for
> a mismatch is if the SPARK_HOME environment variable is set. This will
> cause the spark-submit script to use the launcher determined by that
> environment variable, regardless of the directory from which it was called.
>
> On Mon, Oct 10, 2016 at 3:42 AM, kant kodali  wrote:
>
>> +1 Wooho I have the same problem. I have been trying hard to fix this.
>>
>>
>>
>> On Mon, Oct 10, 2016 3:23 AM, vaibhav thapliyal
>> vaibhav.thapliyal...@gmail.com wrote:
>>
>>> Hi,
>>> If I change the parameter inside the setMaster()  to "local", the
>>> program runs. Is there something wrong with the cluster installation?
>>>
>>> I used the spark-2.0.1-bin-hadoop2.7.tgz package to install on my
>>> cluster with default configuration.
>>>
>>> Thanks
>>> Vaibhav
>>>
>>> On 10 Oct 2016 12:49, "vaibhav thapliyal" >> m> wrote:
>>>
>>> Here is the code that I am using:
>>>
>>> public class SparkTest {
>>>
>>>
>>> public static void main(String[] args) {
>>>
>>> SparkConf conf = new SparkConf().setMaster("spark://
>>> 192.168.10.174:7077").setAppName("TestSpark");
>>> JavaSparkContext sc = new JavaSparkContext(conf);
>>>
>>> JavaRDD textFile = sc.textFile("sampleFile.txt");
>>> JavaRDD words = textFile.flatMap(new
>>> FlatMapFunction() {
>>> public Iterator call(String s) {
>>> return Arrays.asList(s.split(" ")).iterator();
>>> }
>>> });
>>> JavaPairRDD pairs = words.mapToPair(new
>>> PairFunction() {
>>> public Tuple2 call(String s) {
>>> return new Tuple2(s, 1);
>>> }
>>> });
>>> JavaPairRDD counts = pairs.reduceByKey(new
>>> Function2() {
>>> public Integer call(Integer a, Integer b) {
>>> return a + b;
>>> }
>>> });
>>> counts.saveAsTextFile("outputFile.txt");
>>>
>>> }
>>> }
>>>
>>> The content of the input file:
>>> Hello Spark
>>> Hi Spark
>>> Spark is running
>>>
>>>
>>> I am using the spark 2.0.1 dependency from maven.
>>>
>>> Thanks
>>> Vaibhav
>>>
>>> On 10 October 2016 at 12:37, Sudhanshu Janghel <
>>> sudhanshu.jang...@cloudwick.com> wrote:
>>>
>>> Seems like a straightforward error it's trying to cast something as a
>>> list which is not a list or cannot be casted.  Are you using standard
>>> example code? Can u send the input and code?
>>>
>>> On Oct 10, 2016 9:05 AM, "vaibhav thapliyal" <
>>> vaibhav.thapliyal...@gmail.com> wrote:
>>>
>>> Dear All,
>>>
>>> I am getting a ClassCastException Error when using the JAVA API to run
>>> the wordcount example from the docs.
>>>
>>> Here is the log that I got:
>>>
>>> 16/10/10 11:52:12 ERROR Executor: Exception in task 0.2 in stage 0.0 (TID 4)
>>> java.lang.ClassCastException: cannot assign instance of 
>>> scala.collection.immutable.List$SerializationProxy to field 
>>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
>>> scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
>>> at 
>>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>>> at 
>>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>>> at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>> at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>> at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> at 
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>>> at 
>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>>>  

Design consideration for a trading System

2016-10-10 Thread Mich Talebzadeh
Hi,

I have been working on some Lambda Architecture for trading systems.

I think I have completed the dry runs for testing the modules.

For batch layer the criteria is a day's lag (one day old data). This is
acceptable for the users who come from BI background using Tableau but I
think we can do better.

*For batch layer* the design is as follows:


   1. Trade prices are collect by Kafka at the rate of 2 seconds comprising
   a batch of 20 prices for each security (ticker), each row of csv has UUID,
   ticker, timestamp and price, comma separated
   2. In a minute we have 600 prices, in an hour  36K and a day 864K prices
   3. Flume is used to put these "raw" prices on HDFS (source -> Kafka ->
   Flume -> HDFS)
   4. Every 15 minutes (24x7) a cron runs that picks up these files,
   creates a large file comprising prices from small files and does an insert
   into Hbase table. This process is done via ImportTsv and is pretty fast
   (under 2 minutes). Doing individual file reads is terribly slow with
   ImporTsv that uses map-reduce
   5. As an additional test we also have a cron that picks up these
   individual  raw files every 15 minutes and puts them into Hive table (this
   will be decommissioned eventually)
   6. We have views built in Hive and Phoenix that are on top of Hbase
   tables. Phoenix works OK in Zeppelin (albeit not with rich SQL as Hive).
   Also we can use Spark on Hive tables in Zeppelin and we can use Tableau
   with Hive and Spark and its data caching facilities (in Tableau server)
   7. It is my understanding that the new release of Hive will have LLAP as
   its in-memory database.

So in summary the batch layer as of now offers all data with 15 minutes lag

In designing this layer I had to take into account the ease of use and
familiarity of our users with Tableau and Zeppelin (eventually). In
addition, certain power users can use Spark FP in Zeppelin as well.

The Tableau users tend to do analytics at macro level. In other words what
they want is speedy queries not necessarily on the latest data.

*For speed layer*


   1. Kafka feeds data into spark streaming (SS)
   2. For each RDD, prices are analysed for every row
   3. Calculations are done on prices and if they satisfy the criteria, a
   message is sent to Alert (currently a simple Beep). The row is also
   displayed for these particular prices to alert the trader
   4. The details of these trades are flushed to another Hbase table real
   time, impressively fast

Mon Oct 10 22:37:52 BST 2016, Price on S16 hit 99.25724
Mon Oct 10 22:37:56 BST 2016, Price on S05 hit 99.5992
So this is the basics of speed layer

The serving layer will have combined data from both batch and speed layers.
We are in the process of building a real time dashboard as well.

I thought a bit and decided that extracting Hbase data through Hive or
Phoenix skins provide what we need together with Spark. I have not yet
managed to use Phoenix on Spark and using Hbase directly for Spark 2 is not
available and even if we did using SQL skin for visualisation tools are
better.

Sorry about this long monologue.  Appreciate any feedbacks.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: What happens when an executor crashes?

2016-10-10 Thread Samy Dindane



On 10/10/2016 8:14 PM, Cody Koeninger wrote:

Glad it was helpful :)

As far as executors, my expectation is that if you have multiple
executors running, and one of them crashes, the failed task will be
submitted on a different executor.  That is typically what I observe
in spark apps, if that's not what you're seeing I'd try to get help on
that specific issue.

As far as kafka offsets per-partition, you need to atomically write
your offsets and results in the same place.If that place is a
filesystem, you need to be using atomic operations (I try to stay away
from HDFS, but I believe renames are atomic, for instance).

That's what I am trying to do.
> If you're

doing that in normal code, ie with an iterator instead of an rdd or
dataframe, you may have to do some of that work yourself.

How would you do that without an iterator?
As far as I've seen, to have granular control over partitions (and each 
partition's checkpoint), one needs to use `foreachPartition` or 
`mapPartitions`, thus dealing with an iterator instead of a RDD.


As far as using partitionBy, even after repartitioning you can still
use the general idea of writing results and offsets in the same place.
I don't understand what you mean. What do you mean by "the same place"? 
Why would I want to do that?

The major difference is that you need to write all offsets in each
partition (because things have been shuffled),

Oh I didn't think of shuffling after partitioning.
But does it matter if I use partitionBy()?

I will make a code example once at the office so you can take a look.

and need to be more
careful on startup after a failure.  On startup, you'd see which
partitions were incomplete, start the job from the offsets in the
incomplete partitions, do the work for all partitions,

It's clear until here.

but ignore the
writes when they got to the complete partitions.  I realize that's
kind of a complicated description, if it doesn't make sense ask, or I
may be able to put up some publicly visible code at some point in the
future.

Yes I don't see what you mean. :)

I really appreciate your help. Thanks a lot.



On Mon, Oct 10, 2016 at 12:12 PM, Samy Dindane  wrote:

I just noticed that you're the author of the code I linked in my previous
email. :) It's helpful.

When using `foreachPartition` or `mapPartitions`, I noticed I can't ask
Spark to write the data on the disk using `df.write()` but I need to use the
iterator to do so, which means losing the ability of using partitionBy().
Do you know a workaround? Or I'll be forced to partition data manually.

I think I understand why the job crashes when a single executor does:
`df.write()save()` writes all the partitions in the same time, which
fails if one of them has died.
Is that right?

Thank you.

Samy


On 10/10/2016 04:58 PM, Samy Dindane wrote:


Hi Cody,

I am writing a spark job that reads records from a Kafka topic and writes
them on the file system.
This would be straightforward if it weren't for the custom checkpointing
logic I want to have; Spark's checkpointing doesn't suit us as it doesn't
permit code updates.

The custom checkpointing would be similar to this:
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala
I am trying to understand how this would work if an executor crashes, so I
tried making one crash manually, but I noticed it kills the whole job
instead of creating another executor to resume the task.
Is that expected? Is there anything wrong with my approach?

Thank you for your time.


On 10/10/2016 04:29 PM, Cody Koeninger wrote:


What is it you're actually trying to accomplish?

On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane  wrote:


I managed to make a specific executor crash by using
TaskContext.get.partitionId and throwing an exception for a specific
executor.

The issue I have now is that the whole job stops when a single executor
crashes.
Do I need to explicitly tell Spark to start a new executor and keep the
other ones running?


On 10/10/2016 11:19 AM, Samy Dindane wrote:



Hi,

I am writing a streaming job that reads a Kafka topic.
As far as I understand, Spark does a 1:1 mapping between its executors
and
Kafka partitions.

In order to correctly implement my checkpoint logic, I'd like to know
what
exactly happens when an executors crashes.
Also, is it possible to kill an executor manually for testing purposes?

Thank you.

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark] RDDs are not persisting in memory

2016-10-10 Thread diplomatic Guru
Hello team,

Spark version: 1.6.0

I'm trying to persist done data into memory for reusing them. However, when
I call rdd.cache() OR  rdd.persist(StorageLevel.MEMORY_ONLY())  it does not
store the data as I can not see any rdd information under WebUI (Storage
Tab).

Therefore I tried rdd.persist(StorageLevel.MEMORY_AND_DISK()), for which it
stored the data into Disk only as shown in below screenshot:

[image: Inline images 2]

Do you know why the memory is not being used?

Is there a configuration in cluster level to stop jobs from storing data
into memory altogether?


Please let me know.

Thanks

Guru


Re: Spark S3

2016-10-10 Thread ayan guha
It really depends on the input format used.
On 11 Oct 2016 08:46, "Selvam Raman"  wrote:

> Hi,
>
> How spark reads data from s3 and runs parallel task.
>
> Assume I have a s3 bucket size of 35 GB( parquet file).
>
> How the sparksession will read the data and process the data parallel. How
> it splits the s3 data and assign to each executor task.
>
> ​Please share me your points.
>
> Note:
> if we have RDD , then we can look at the partitions.size or length to
> check how many partition for a file. But how this will be accomplished in
> terms of S3 bucket.​
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Spark Streaming Advice

2016-10-10 Thread Jörn Franke
Your file size is too small this has a significant impact on the namenode. Use 
Hbase or maybe hawq to store small writes.

> On 10 Oct 2016, at 16:25, Kevin Mellott  wrote:
> 
> Whilst working on this application, I found a setting that drastically 
> improved the performance of my particular Spark Streaming application. I'm 
> sharing the details in hopes that it may help somebody in a similar situation.
> 
> As my program ingested information into HDFS (as parquet files), I noticed 
> that the time to process each batch was significantly greater than I 
> anticipated. Whether I was writing a single parquet file (around 8KB) or 
> around 10-15 files (8KB each), that step of the processing was taking around 
> 30 seconds. Once I set the configuration below, this operation reduced from 
> 30 seconds to around 1 second.
> 
> // ssc = instance of SparkStreamingContext
> ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", 
> "false")
> 
> I've also verified that the parquet files being generated are usable by both 
> Hive and Impala.
> 
> Hope that helps!
> Kevin
> 
>> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott  
>> wrote:
>> I'm attempting to implement a Spark Streaming application that will consume 
>> application log messages from a message broker and store the information in 
>> HDFS. During the data ingestion, we apply a custom schema to the logs, 
>> partition by application name and log date, and then save the information as 
>> parquet files.
>> 
>> All of this works great, except we end up having a large number of parquet 
>> files created. It's my understanding that Spark Streaming is unable to 
>> control the number of files that get generated in each partition; can 
>> anybody confirm that is true? 
>> 
>> Also, has anybody else run into a similar situation regarding data ingestion 
>> with Spark Streaming and do you have any tips to share? Our end goal is to 
>> store the information in a way that makes it efficient to query, using a 
>> tool like Hive or Impala.
>> 
>> Thanks,
>> Kevin
> 


Spark S3

2016-10-10 Thread Selvam Raman
Hi,

How spark reads data from s3 and runs parallel task.

Assume I have a s3 bucket size of 35 GB( parquet file).

How the sparksession will read the data and process the data parallel. How
it splits the s3 data and assign to each executor task.

​Please share me your points.

Note:
if we have RDD , then we can look at the partitions.size or length to check
how many partition for a file. But how this will be accomplished in terms
of S3 bucket.​

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: ClassCastException while running a simple wordCount

2016-10-10 Thread Jakob Odersky
Ho do you submit the application? A version mismatch between the launcher,
driver and workers could lead to the bug you're seeing. A common reason for
a mismatch is if the SPARK_HOME environment variable is set. This will
cause the spark-submit script to use the launcher determined by that
environment variable, regardless of the directory from which it was called.

On Mon, Oct 10, 2016 at 3:42 AM, kant kodali  wrote:

> +1 Wooho I have the same problem. I have been trying hard to fix this.
>
>
>
> On Mon, Oct 10, 2016 3:23 AM, vaibhav thapliyal
> vaibhav.thapliyal...@gmail.com wrote:
>
>> Hi,
>> If I change the parameter inside the setMaster()  to "local", the program
>> runs. Is there something wrong with the cluster installation?
>>
>> I used the spark-2.0.1-bin-hadoop2.7.tgz package to install on my cluster
>> with default configuration.
>>
>> Thanks
>> Vaibhav
>>
>> On 10 Oct 2016 12:49, "vaibhav thapliyal" 
>> wrote:
>>
>> Here is the code that I am using:
>>
>> public class SparkTest {
>>
>>
>> public static void main(String[] args) {
>>
>> SparkConf conf = new SparkConf().setMaster("spark://
>> 192.168.10.174:7077").setAppName("TestSpark");
>> JavaSparkContext sc = new JavaSparkContext(conf);
>>
>> JavaRDD textFile = sc.textFile("sampleFile.txt");
>> JavaRDD words = textFile.flatMap(new
>> FlatMapFunction() {
>> public Iterator call(String s) {
>> return Arrays.asList(s.split(" ")).iterator();
>> }
>> });
>> JavaPairRDD pairs = words.mapToPair(new
>> PairFunction() {
>> public Tuple2 call(String s) {
>> return new Tuple2(s, 1);
>> }
>> });
>> JavaPairRDD counts = pairs.reduceByKey(new
>> Function2() {
>> public Integer call(Integer a, Integer b) {
>> return a + b;
>> }
>> });
>> counts.saveAsTextFile("outputFile.txt");
>>
>> }
>> }
>>
>> The content of the input file:
>> Hello Spark
>> Hi Spark
>> Spark is running
>>
>>
>> I am using the spark 2.0.1 dependency from maven.
>>
>> Thanks
>> Vaibhav
>>
>> On 10 October 2016 at 12:37, Sudhanshu Janghel <
>> sudhanshu.jang...@cloudwick.com> wrote:
>>
>> Seems like a straightforward error it's trying to cast something as a
>> list which is not a list or cannot be casted.  Are you using standard
>> example code? Can u send the input and code?
>>
>> On Oct 10, 2016 9:05 AM, "vaibhav thapliyal" <
>> vaibhav.thapliyal...@gmail.com> wrote:
>>
>> Dear All,
>>
>> I am getting a ClassCastException Error when using the JAVA API to run
>> the wordcount example from the docs.
>>
>> Here is the log that I got:
>>
>> 16/10/10 11:52:12 ERROR Executor: Exception in task 0.2 in stage 0.0 (TID 4)
>> java.lang.ClassCastException: cannot assign instance of 
>> scala.collection.immutable.List$SerializationProxy to field 
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
>> scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
>>  at 
>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>>  at 
>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>  at 
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>>  at 
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:86)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> 16/10/10 11:52:12 ERROR Executor: 

Re: Spark Streaming Advice

2016-10-10 Thread Kevin Mellott
The batch interval was set to 30 seconds; however, after getting the
parquet files to save faster I lowered the interval to 10 seconds. The
number of log messages contained in each batch varied from just a few up to
around 3500, with the number of partitions ranging from 1 to around 15.

I will have to check out HBase as well; I've heard good things!

Thanks,
Kevin

On Mon, Oct 10, 2016 at 11:38 AM, Mich Talebzadeh  wrote:

> Hi Kevin,
>
> What is the streaming interval (batch interval) above?
>
> I do analytics on streaming trade data but after manipulation of
> individual messages I store the selected on in Hbase. Very fast.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 October 2016 at 15:25, Kevin Mellott 
> wrote:
>
>> Whilst working on this application, I found a setting that drastically
>> improved the performance of my particular Spark Streaming application. I'm
>> sharing the details in hopes that it may help somebody in a similar
>> situation.
>>
>> As my program ingested information into HDFS (as parquet files), I
>> noticed that the time to process each batch was significantly greater than
>> I anticipated. Whether I was writing a single parquet file (around 8KB) or
>> around 10-15 files (8KB each), that step of the processing was taking
>> around 30 seconds. Once I set the configuration below, this operation
>> reduced from 30 seconds to around 1 second.
>>
>> // ssc = instance of SparkStreamingContext
>> ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
>> "false")
>>
>> I've also verified that the parquet files being generated are usable by
>> both Hive and Impala.
>>
>> Hope that helps!
>> Kevin
>>
>> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott 
>> wrote:
>>
>>> I'm attempting to implement a Spark Streaming application that will
>>> consume application log messages from a message broker and store the
>>> information in HDFS. During the data ingestion, we apply a custom schema to
>>> the logs, partition by application name and log date, and then save the
>>> information as parquet files.
>>>
>>> All of this works great, except we end up having a large number of
>>> parquet files created. It's my understanding that Spark Streaming is unable
>>> to control the number of files that get generated in each partition; can
>>> anybody confirm that is true?
>>>
>>> Also, has anybody else run into a similar situation regarding data
>>> ingestion with Spark Streaming and do you have any tips to share? Our end
>>> goal is to store the information in a way that makes it efficient to query,
>>> using a tool like Hive or Impala.
>>>
>>> Thanks,
>>> Kevin
>>>
>>
>>
>


Re: JSON Arrays and Spark

2016-10-10 Thread Jean Georges Perrin
Thanks Luciano - I think this is my issue :(

> On Oct 10, 2016, at 2:08 PM, Luciano Resende  wrote:
> 
> Please take a look at 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets 
> 
> 
> Particularly the note at the required format :
> 
> Note that the file that is offered as a json file is not a typical JSON file. 
> Each line must contain a separate, self-contained valid JSON object. As a 
> consequence, a regular multi-line JSON file will most often fail.
> 
> 
> 
> On Mon, Oct 10, 2016 at 9:57 AM, Jean Georges Perrin  > wrote:
> Hi folks,
> 
> I am trying to parse JSON arrays and it’s getting a little crazy (for me at 
> least)…
> 
> 1)
> If my JSON is:
> {"vals":[100,500,600,700,800,200,900,300]}
> 
> I get:
> ++
> |vals|
> ++
> |[100, 500, 600, 7...|
> ++
> 
> root
>  |-- vals: array (nullable = true)
>  ||-- element: long (containsNull = true)
> 
> and I am :)
> 
> 2)
> If my JSON is:
> [100,500,600,700,800,200,900,300]
> 
> I get:
> ++
> | _corrupt_record|
> ++
> |[100,500,600,700,...|
> ++
> 
> root
>  |-- _corrupt_record: string (nullable = true)
> 
> Both are legit JSON structures… Do you think that #2 is a bug?
> 
> jg
> 
> 
> 
> 
> 
> 
> 
> 
> -- 
> Luciano Resende
> http://twitter.com/lresende1975 
> http://lresende.blogspot.com/ 


Re: Manually committing offset in Spark 2.0 with Kafka 0.10 and Java

2016-10-10 Thread Cody Koeninger
This should give you hints on the necessary cast:

http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html#tab_java_2

The main ugly thing there is that the java rdd is wrapping the scala
rdd, so you need to unwrap one layer via rdd.rdd()

If anyone wants to work on a PR to update the java examples in the
docs for the 0-10 version, I'm happy to help.

On Mon, Oct 10, 2016 at 10:34 AM, static-max  wrote:
> Hi,
>
> by following this article I managed to consume messages from Kafka 0.10 in
> Spark 2.0:
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
> However, the Java examples are missing and I would like to commit the offset
> myself after processing the RDD. Does anybody have a working example for me?
> "offsetRanges" seems to be a trait and not available after casting the RDD
> to "HasOffsetRanges"
>
> Thanks a lot!
>
> Scala example:
>
> val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: JSON Arrays and Spark

2016-10-10 Thread Jean Georges Perrin
Thanks!

I am ok with strict rules (despite being French), but even:
[{
"red": "#f00", 
"green": "#0f0"
},{
"red": "#f01", 
"green": "#0f1"
}]

is not going through…

Is there a way to see what he does not like?

the JSON parser has been pretty good to me until recently.


> On Oct 10, 2016, at 12:59 PM, Sudhanshu Janghel <> wrote:
> 
> As far as my experience goes spark can parse only certain types of Json 
> correctly not all and has strict Parsing rules unlike python
> 
> 
> On Oct 10, 2016 6:57 PM, "Jean Georges Perrin"  > wrote:
> Hi folks,
> 
> I am trying to parse JSON arrays and it’s getting a little crazy (for me at 
> least)…
> 
> 1)
> If my JSON is:
> {"vals":[100,500,600,700,800,200,900,300]}
> 
> I get:
> ++
> |vals|
> ++
> |[100, 500, 600, 7...|
> ++
> 
> root
>  |-- vals: array (nullable = true)
>  ||-- element: long (containsNull = true)
> 
> and I am :)
> 
> 2)
> If my JSON is:
> [100,500,600,700,800,200,900,300]
> 
> I get:
> ++
> | _corrupt_record|
> ++
> |[100,500,600,700,...|
> ++
> 
> root
>  |-- _corrupt_record: string (nullable = true)
> 
> Both are legit JSON structures… Do you think that #2 is a bug?
> 
> jg
> 
> 
> 
> 
> 
> 
> Disclaimer: The information in this email is confidential and may be legally 
> privileged. Access to this email by anyone other than the intended addressee 
> is unauthorized. If you are not the intended recipient of this message, any 
> review, disclosure, copying, distribution, retention, or any action taken or 
> omitted to be taken in reliance on it is prohibited and may be unlawful.



Re: What happens when an executor crashes?

2016-10-10 Thread Cody Koeninger
Glad it was helpful :)

As far as executors, my expectation is that if you have multiple
executors running, and one of them crashes, the failed task will be
submitted on a different executor.  That is typically what I observe
in spark apps, if that's not what you're seeing I'd try to get help on
that specific issue.

As far as kafka offsets per-partition, you need to atomically write
your offsets and results in the same place.  If that place is a
filesystem, you need to be using atomic operations (I try to stay away
from HDFS, but I believe renames are atomic, for instance).  If you're
doing that in normal code, ie with an iterator instead of an rdd or
dataframe, you may have to do some of that work yourself.

As far as using partitionBy, even after repartitioning you can still
use the general idea of writing results and offsets in the same place.
The major difference is that you need to write all offsets in each
partition (because things have been shuffled), and need to be more
careful on startup after a failure.  On startup, you'd see which
partitions were incomplete, start the job from the offsets in the
incomplete partitions, do the work for all partitions, but ignore the
writes when they got to the complete partitions.  I realize that's
kind of a complicated description, if it doesn't make sense ask, or I
may be able to put up some publicly visible code at some point in the
future.


On Mon, Oct 10, 2016 at 12:12 PM, Samy Dindane  wrote:
> I just noticed that you're the author of the code I linked in my previous
> email. :) It's helpful.
>
> When using `foreachPartition` or `mapPartitions`, I noticed I can't ask
> Spark to write the data on the disk using `df.write()` but I need to use the
> iterator to do so, which means losing the ability of using partitionBy().
> Do you know a workaround? Or I'll be forced to partition data manually.
>
> I think I understand why the job crashes when a single executor does:
> `df.write()save()` writes all the partitions in the same time, which
> fails if one of them has died.
> Is that right?
>
> Thank you.
>
> Samy
>
>
> On 10/10/2016 04:58 PM, Samy Dindane wrote:
>>
>> Hi Cody,
>>
>> I am writing a spark job that reads records from a Kafka topic and writes
>> them on the file system.
>> This would be straightforward if it weren't for the custom checkpointing
>> logic I want to have; Spark's checkpointing doesn't suit us as it doesn't
>> permit code updates.
>>
>> The custom checkpointing would be similar to this:
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala
>> I am trying to understand how this would work if an executor crashes, so I
>> tried making one crash manually, but I noticed it kills the whole job
>> instead of creating another executor to resume the task.
>> Is that expected? Is there anything wrong with my approach?
>>
>> Thank you for your time.
>>
>>
>> On 10/10/2016 04:29 PM, Cody Koeninger wrote:
>>>
>>> What is it you're actually trying to accomplish?
>>>
>>> On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane  wrote:

 I managed to make a specific executor crash by using
 TaskContext.get.partitionId and throwing an exception for a specific
 executor.

 The issue I have now is that the whole job stops when a single executor
 crashes.
 Do I need to explicitly tell Spark to start a new executor and keep the
 other ones running?


 On 10/10/2016 11:19 AM, Samy Dindane wrote:
>
>
> Hi,
>
> I am writing a streaming job that reads a Kafka topic.
> As far as I understand, Spark does a 1:1 mapping between its executors
> and
> Kafka partitions.
>
> In order to correctly implement my checkpoint logic, I'd like to know
> what
> exactly happens when an executors crashes.
> Also, is it possible to kill an executor manually for testing purposes?
>
> Thank you.
>
> Samy
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org

>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: JSON Arrays and Spark

2016-10-10 Thread Luciano Resende
Please take a look at
http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

Particularly the note at the required format :

Note that the file that is offered as *a json file* is not a typical JSON
file. Each line must contain a separate, self-contained valid JSON object.
As a consequence, a regular multi-line JSON file will most often fail.



On Mon, Oct 10, 2016 at 9:57 AM, Jean Georges Perrin  wrote:

> Hi folks,
>
> I am trying to parse JSON arrays and it’s getting a little crazy (for me
> at least)…
>
> 1)
> If my JSON is:
> {"vals":[100,500,600,700,800,200,900,300]}
>
> I get:
> ++
> |vals|
> ++
> |[100, 500, 600, 7...|
> ++
>
> root
>  |-- vals: array (nullable = true)
>  ||-- element: long (containsNull = true)
>
> and I am :)
>
> 2)
> If my JSON is:
> [100,500,600,700,800,200,900,300]
>
> I get:
> ++
> | _corrupt_record|
> ++
> |[100,500,600,700,...|
> ++
>
> root
>  |-- _corrupt_record: string (nullable = true)
>
> Both are legit JSON structures… Do you think that #2 is a bug?
>
> jg
>
>
>
>
>
>


-- 
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


converting hBaseRDD to DataFrame

2016-10-10 Thread Mich Talebzadeh
Hi,

I am trying to do some operation on an Hbase table that is being populated
by Spark Streaming.

Now this is just Spark on Hbase as opposed to Spark on Hive -> view on
Hbase etc. I also have Phoenix view on this Hbase table.

This is sample code

scala> val tableName = "marketDataHbaseTest"
> val conf = HBaseConfiguration.create()
conf: org.apache.hadoop.conf.Configuration = Configuration:
core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
hbase-default.xml, hbase-site.xml
scala> conf.set(TableInputFormat.INPUT_TABLE, tableName)
scala> //create rdd
scala>
*val hBaseRDD = sc.newAPIHadoopRDD(conf,
classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])*hBaseRDD:
org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable,
org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[4] at
newAPIHadoopRDD at :64
scala> hBaseRDD.count
res11: Long = 22272

Now that I have hBaseRDD ,is there anyway I can create a DF on it? I
understand that it is not as simple as doing toDF on RDD

scala>  hBaseRDD.toDF
java.lang.AssertionError: assertion failed: no symbol could be loaded from
interface org.apache.hadoop.hbase.classification.InterfaceAudience$Public
in object InterfaceAudience with name Public and classloader
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@7b44e98e

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Large variation in spark in Task Deserialization Time

2016-10-10 Thread Pulasthi Supun Wickramasinghe
Hi All,

I am seeing a huge variation on spark Task Deserialization Time for my
collect and reduce operations. while most tasks complete within 100ms a few
take mote than a couple of seconds which slows the entire program down. I
have attached a screen shot of the web ui where you can see the variation


As you can see the Task Deserialization Time time has a Max of 7s and 75th
percentile at 0.3 seconds.

Does anyone know the reasons that may cause these kind of numbers. Any help
would be greatly appreciated.

Best Regards,
​
-- 
Pulasthi S. Wickramasinghe
Graduate Student  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


Error: PartitioningCollection requires all of its partitionings have the same numPartitions.

2016-10-10 Thread cuevasclemente
Hello,

I am having some interesting issues with a consistent error in spark that
occurs when I'm working with dataframes that are the result of some amounts
of joining and other transformations. 

PartitioningCollection requires all of its partitionings have the same
numPartitions.

It seems to happen after I join two DataFrames together which are fairly
reasonable on their own, but after joining them, the operations on the
joined dataframe can yield this error. I am really just trying to understand
why this error might be appearing or what the meaning behind it is as I
can't seem to find any documentation on it:

The following invocation results in the exception:

val resultDataframe = dataFrame1
.join(dataFrame2, 
$"first_column" === $"second_column").take(2)

but I can certainly call

dataFrame1.take(2)

and

dataFrame2.take(2)

I also tried repartitioning the DataFrames, using
Dataset.repartition(numPartitions) or Dataset.coalesce(numParitions) on
dataFrame1 and dataFrame2 before joining, and on resultDataFrame after the
join, but nothing seemed to have affected the error. 

I cannot determine nor easily make reproducible the circumstances
surrounding the error, and this message is more asking why this error might
appear.

I posted essentially this issue on an external help site, stackoverflow,
about this issue, which I will link here as there was a small amount of
discussion I probably can't reproduce here:
http://stackoverflow.com/questions/39780784/spark-2-0-0-error-partitioningcollection-requires-all-of-its-partitionings-have/39793449
(I hope it is not frowned upon to link to an external page on help
requests), and so far the issue seems to be confirmed by at least one other
user, but I was not able to find other mentions of it in this listserv or
elsewhere through some cursory googling.


Thanks for any help



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-PartitioningCollection-requires-all-of-its-partitionings-have-the-same-numPartitions-tp27875.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: What happens when an executor crashes?

2016-10-10 Thread Samy Dindane

I just noticed that you're the author of the code I linked in my previous 
email. :) It's helpful.

When using `foreachPartition` or `mapPartitions`, I noticed I can't ask Spark 
to write the data on the disk using `df.write()` but I need to use the iterator 
to do so, which means losing the ability of using partitionBy().
Do you know a workaround? Or I'll be forced to partition data manually.

I think I understand why the job crashes when a single executor does: 
`df.write()save()` writes all the partitions in the same time, which fails 
if one of them has died.
Is that right?

Thank you.

Samy

On 10/10/2016 04:58 PM, Samy Dindane wrote:

Hi Cody,

I am writing a spark job that reads records from a Kafka topic and writes them 
on the file system.
This would be straightforward if it weren't for the custom checkpointing logic 
I want to have; Spark's checkpointing doesn't suit us as it doesn't permit code 
updates.

The custom checkpointing would be similar to this: 
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala
I am trying to understand how this would work if an executor crashes, so I 
tried making one crash manually, but I noticed it kills the whole job instead 
of creating another executor to resume the task.
Is that expected? Is there anything wrong with my approach?

Thank you for your time.


On 10/10/2016 04:29 PM, Cody Koeninger wrote:

What is it you're actually trying to accomplish?

On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane  wrote:

I managed to make a specific executor crash by using
TaskContext.get.partitionId and throwing an exception for a specific
executor.

The issue I have now is that the whole job stops when a single executor
crashes.
Do I need to explicitly tell Spark to start a new executor and keep the
other ones running?


On 10/10/2016 11:19 AM, Samy Dindane wrote:


Hi,

I am writing a streaming job that reads a Kafka topic.
As far as I understand, Spark does a 1:1 mapping between its executors and
Kafka partitions.

In order to correctly implement my checkpoint logic, I'd like to know what
exactly happens when an executors crashes.
Also, is it possible to kill an executor manually for testing purposes?

Thank you.

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Inserting New Primary Keys

2016-10-10 Thread Benjamin Kim
Jean,

I see your point. For the incremental data, which is very small, I should make 
sure that the PARTITION BY in the OVER(PARTITION BY ...) is left out so that 
all the data will be in one partition when assigned a row number. The query 
below should avoid any problems.

“SELECT ROW_NUMBER() OVER() + b.id_max AS id, a.* FROM source a CROSS JOIN 
(SELECT COALESCE(MAX(id),0) AS id_max FROM tmp_destination) b”.

But initially, I’ll use the monotonicallyIncreasingId function when I first 
load the data.

Thanks,
Ben


> On Oct 10, 2016, at 8:36 AM, Jean Georges Perrin  wrote:
> 
> Is there only one process adding rows? because this seems a little risky if 
> you have multiple threads doing that… 
> 
>> On Oct 8, 2016, at 1:43 PM, Benjamin Kim > > wrote:
>> 
>> Mich,
>> 
>> After much searching, I found and am trying to use “SELECT ROW_NUMBER() 
>> OVER() + b.id_max AS id, a.* FROM source a CROSS JOIN (SELECT 
>> COALESCE(MAX(id),0) AS id_max FROM tmp_destination) b”. I think this should 
>> do it.
>> 
>> Thanks,
>> Ben
>> 
>> 
>>> On Oct 8, 2016, at 9:48 AM, Mich Talebzadeh >> > wrote:
>>> 
>>> can you get the max value from the current  table and start from MAX(ID) + 
>>> 1 assuming it is a numeric value (it should be)?
>>> 
>>> HTH
>>> 
>>> HTH
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> 
>>>  
>>> http://talebzadehmich.wordpress.com 
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> On 8 October 2016 at 17:42, Benjamin Kim >> > wrote:
>>> I have a table with data already in it that has primary keys generated by 
>>> the function monotonicallyIncreasingId. Now, I want to insert more data 
>>> into it with primary keys that will auto-increment from where the existing 
>>> data left off. How would I do this? There is no argument I can pass into 
>>> the function monotonicallyIncreasingId to seed it.
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> 
>>> 
>>> 
>> 
> 



JSON Arrays and Spark

2016-10-10 Thread Jean Georges Perrin
Hi folks,

I am trying to parse JSON arrays and it’s getting a little crazy (for me at 
least)…

1)
If my JSON is:
{"vals":[100,500,600,700,800,200,900,300]}

I get:
++
|vals|
++
|[100, 500, 600, 7...|
++

root
 |-- vals: array (nullable = true)
 ||-- element: long (containsNull = true)

and I am :)

2)
If my JSON is:
[100,500,600,700,800,200,900,300]

I get:
++
| _corrupt_record|
++
|[100,500,600,700,...|
++

root
 |-- _corrupt_record: string (nullable = true)

Both are legit JSON structures… Do you think that #2 is a bug?

jg







Re: Map with state keys serialization

2016-10-10 Thread Shixiong(Ryan) Zhu
That's enough. Did you see any error?

On Mon, Oct 10, 2016 at 5:08 AM, Joey Echeverria  wrote:

> Hi Ryan!
>
> Do you know where I need to configure Kryo for this? I already have
> spark.serializer=org.apache.spark.serializer.KryoSerializer in my
> SparkConf and I registered the class. Is there a different
> configuration setting for the state map keys?
>
> Thanks!
>
> -Joey
>
> On Sun, Oct 9, 2016 at 10:58 PM, Shixiong(Ryan) Zhu
>  wrote:
> > You can use Kryo. It also implements KryoSerializable which is supported
> by
> > Kryo.
> >
> > On Fri, Oct 7, 2016 at 11:39 AM, Joey Echeverria 
> wrote:
> >>
> >> Looking at the source code for StateMap[1], which is used by
> >> JavaPairDStream#mapWithState(), it looks like state keys are
> >> serialized using an ObjectOutputStream. I couldn't find a reference to
> >> this restriction in the documentation. Did I miss that?
> >>
> >> Unless I'm mistaken, I'm guessing there isn't a way to use Kryo for
> >> this serialization?
> >>
> >> Thanks!
> >>
> >> -Joey
> >>
> >> [1]
> >> https://github.com/apache/spark/blob/master/streaming/
> src/main/scala/org/apache/spark/streaming/util/StateMap.scala#L251
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>
>
>
> --
> -Joey
>


Re: Spark Streaming Advice

2016-10-10 Thread Mich Talebzadeh
Hi Kevin,

What is the streaming interval (batch interval) above?

I do analytics on streaming trade data but after manipulation of individual
messages I store the selected on in Hbase. Very fast.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 October 2016 at 15:25, Kevin Mellott 
wrote:

> Whilst working on this application, I found a setting that drastically
> improved the performance of my particular Spark Streaming application. I'm
> sharing the details in hopes that it may help somebody in a similar
> situation.
>
> As my program ingested information into HDFS (as parquet files), I noticed
> that the time to process each batch was significantly greater than I
> anticipated. Whether I was writing a single parquet file (around 8KB) or
> around 10-15 files (8KB each), that step of the processing was taking
> around 30 seconds. Once I set the configuration below, this operation
> reduced from 30 seconds to around 1 second.
>
> // ssc = instance of SparkStreamingContext
> ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
> "false")
>
> I've also verified that the parquet files being generated are usable by
> both Hive and Impala.
>
> Hope that helps!
> Kevin
>
> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott 
> wrote:
>
>> I'm attempting to implement a Spark Streaming application that will
>> consume application log messages from a message broker and store the
>> information in HDFS. During the data ingestion, we apply a custom schema to
>> the logs, partition by application name and log date, and then save the
>> information as parquet files.
>>
>> All of this works great, except we end up having a large number of
>> parquet files created. It's my understanding that Spark Streaming is unable
>> to control the number of files that get generated in each partition; can
>> anybody confirm that is true?
>>
>> Also, has anybody else run into a similar situation regarding data
>> ingestion with Spark Streaming and do you have any tips to share? Our end
>> goal is to store the information in a way that makes it efficient to query,
>> using a tool like Hive or Impala.
>>
>> Thanks,
>> Kevin
>>
>
>


Re: Logistic Regression Standardization in ML

2016-10-10 Thread Yanbo Liang
AFAIK, we can guarantee with/without standardization, the models always
converged to the same solution if there is no regularization. You can refer
the test casts at:

https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala#L551


https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala#L588

Thanks
Yanbo

On Mon, Oct 10, 2016 at 7:27 AM, Sean Owen  wrote:

> (BTW I think it means "when no standardization is applied", which is how
> you interpreted it, yes.) I think it just means that if feature i is
> divided by s_i, then its coefficients in the resulting model will end up
> larger by a factor of s_i. They have to be divided by s_i to put them back
> on the same scale as the unnormalized inputs. I don't think that in general
> it will result in exactly the same model, because part of the point of
> standardizing is to improve convergence. You could propose a rewording of
> the two occurrences of this paragraph if you like.
>
> On Mon, Oct 10, 2016 at 3:15 PM Cesar  wrote:
>
>>
>> I have a question regarding how the default standardization in the ML
>> version of the Logistic Regression (Spark 1.6) works.
>>
>> Specifically about the next comments in the Spark Code:
>>
>> /**
>> * Whether to standardize the training features before fitting the model.
>> * The coefficients of models will be always returned on the original
>> scale,
>> * so it will be transparent for users. *Note that with/without
>> standardization,*
>> ** the models should be always converged to the same solution when no
>> regularization*
>> ** is applied.* In R's GLMNET package, the default behavior is true as
>> well.
>> * Default is true.
>> *
>> * @group setParam
>> */
>>
>>
>> Specifically I am having issues with understanding why the solution
>> should converge to the same weight values with/without standardization ?
>>
>>
>>
>> Thanks !
>> --
>> Cesar Flores
>>
>


Re: Problems with new experimental Kafka Consumer for 0.10

2016-10-10 Thread Matthias Niehoff
Yes, without commiting the data the consumer rebalances.
The job consumes 3 streams process them. When consuming only one stream it
runs fine. But when consuming three streams, even without joining them,
just deserialize the payload and trigger an output action it fails.

I will prepare code sample.

2016-10-07 3:35 GMT+02:00 Cody Koeninger :

> OK, so at this point, even without involving commitAsync, you're
> seeing consumer rebalances after a particular batch takes longer than
> the session timeout?
>
> Do you have a minimal code example you can share?
>
> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff
>  wrote:
> > Hi,
> > sry for the late reply. A public holiday in Germany.
> >
> > Yes, its using a unique group id which no other job or consumer group is
> > using. I have increased the session.timeout to 1 minutes and set the
> > max.poll.rate to 1000. The processing takes ~1 second.
> >
> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger :
> >>
> >> Well, I'd start at the first thing suggested by the error, namely that
> >> the group has rebalanced.
> >>
> >> Is that stream using a unique group id?
> >>
> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
> >>  wrote:
> >> > Hi,
> >> >
> >> > the stacktrace:
> >> >
> >> > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot
> >> > be
> >> > completed since the group has already rebalanced and assigned the
> >> > partitions
> >> > to another member. This means that the time between subsequent calls
> to
> >> > poll() was longer than the configured session.timeout.ms, which
> >> > typically
> >> > implies that the poll loop is spending too much time message
> processing.
> >> > You
> >> > can address this either by increasing the session timeout or by
> reducing
> >> > the
> >> > maximum size of batches returned in poll() with max.poll.records.
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture$1.onSuccess(RequestFuture.java:167)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture.fireSuccess(RequestFuture.java:133)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(
> RequestFuture.java:107)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$
> RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> >> > at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:278)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> clientPoll(ConsumerNetworkClient.java:360)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:998)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:937)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
> latestOffsets(DirectKafkaInputDStream.scala:169)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:196)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >> > at
> >> >
> >> > 

Re: Inserting New Primary Keys

2016-10-10 Thread Jean Georges Perrin
Is there only one process adding rows? because this seems a little risky if you 
have multiple threads doing that… 

> On Oct 8, 2016, at 1:43 PM, Benjamin Kim  wrote:
> 
> Mich,
> 
> After much searching, I found and am trying to use “SELECT ROW_NUMBER() 
> OVER() + b.id_max AS id, a.* FROM source a CROSS JOIN (SELECT 
> COALESCE(MAX(id),0) AS id_max FROM tmp_destination) b”. I think this should 
> do it.
> 
> Thanks,
> Ben
> 
> 
>> On Oct 8, 2016, at 9:48 AM, Mich Talebzadeh > > wrote:
>> 
>> can you get the max value from the current  table and start from MAX(ID) + 1 
>> assuming it is a numeric value (it should be)?
>> 
>> HTH
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> On 8 October 2016 at 17:42, Benjamin Kim > > wrote:
>> I have a table with data already in it that has primary keys generated by 
>> the function monotonicallyIncreasingId. Now, I want to insert more data into 
>> it with primary keys that will auto-increment from where the existing data 
>> left off. How would I do this? There is no argument I can pass into the 
>> function monotonicallyIncreasingId to seed it.
>> 
>> Thanks,
>> Ben
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
>> 
>> 
> 



Manually committing offset in Spark 2.0 with Kafka 0.10 and Java

2016-10-10 Thread static-max
Hi,

by following this article I managed to consume messages from Kafka 0.10 in
Spark 2.0:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

However, the Java examples are missing and I would like to commit the
offset myself after processing the RDD. Does anybody have a working example
for me? "offsetRanges" seems to be a trait and not available after casting
the RDD to "HasOffsetRanges"

Thanks a lot!

Scala example:

val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)


Re: spark using two different versions of netty?

2016-10-10 Thread Paweł Szulc
Yeah, I should be more precise. Those are two direct dependencies.

On Mon, Oct 10, 2016 at 1:15 PM, Sean Owen  wrote:

> Usually this sort of thing happens because the two versions are in
> different namespaces in different major versions and both are needed. That
> is true of Netty: http://netty.io/wiki/new-and-noteworthy-in-4.0.html
> However, I see that Spark declares a direct dependency on both, when it
> does not use 3.x directly (and should not). The exception is in the Flume
> module, but that could be handled more narrowly. I will look into fixing
> this if applicable.
>
> On Mon, Oct 10, 2016 at 11:56 AM Paweł Szulc  wrote:
>
>> Hi,
>>
>> quick question, why is Spark using two different versions of netty?:
>>
>>
>>- io.netty:netty-all:4.0.29.Final:jar
>>- io.netty:netty:3.8.0.Final:jar
>>
>>
>> ?
>>
>> --
>> Regards,
>> Paul Szulc
>>
>> twitter: @rabbitonweb
>> blog: www.rabbitonweb.com
>>
>


-- 
Regards,
Paul Szulc

twitter: @rabbitonweb
blog: www.rabbitonweb.com


Re: What happens when an executor crashes?

2016-10-10 Thread Cody Koeninger
What is it you're actually trying to accomplish?

On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane  wrote:
> I managed to make a specific executor crash by using
> TaskContext.get.partitionId and throwing an exception for a specific
> executor.
>
> The issue I have now is that the whole job stops when a single executor
> crashes.
> Do I need to explicitly tell Spark to start a new executor and keep the
> other ones running?
>
>
> On 10/10/2016 11:19 AM, Samy Dindane wrote:
>>
>> Hi,
>>
>> I am writing a streaming job that reads a Kafka topic.
>> As far as I understand, Spark does a 1:1 mapping between its executors and
>> Kafka partitions.
>>
>> In order to correctly implement my checkpoint logic, I'd like to know what
>> exactly happens when an executors crashes.
>> Also, is it possible to kill an executor manually for testing purposes?
>>
>> Thank you.
>>
>> Samy
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Advice

2016-10-10 Thread Kevin Mellott
Whilst working on this application, I found a setting that drastically
improved the performance of my particular Spark Streaming application. I'm
sharing the details in hopes that it may help somebody in a similar
situation.

As my program ingested information into HDFS (as parquet files), I noticed
that the time to process each batch was significantly greater than I
anticipated. Whether I was writing a single parquet file (around 8KB) or
around 10-15 files (8KB each), that step of the processing was taking
around 30 seconds. Once I set the configuration below, this operation
reduced from 30 seconds to around 1 second.

// ssc = instance of SparkStreamingContext
ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
"false")

I've also verified that the parquet files being generated are usable by
both Hive and Impala.

Hope that helps!
Kevin

On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott 
wrote:

> I'm attempting to implement a Spark Streaming application that will
> consume application log messages from a message broker and store the
> information in HDFS. During the data ingestion, we apply a custom schema to
> the logs, partition by application name and log date, and then save the
> information as parquet files.
>
> All of this works great, except we end up having a large number of parquet
> files created. It's my understanding that Spark Streaming is unable to
> control the number of files that get generated in each partition; can
> anybody confirm that is true?
>
> Also, has anybody else run into a similar situation regarding data
> ingestion with Spark Streaming and do you have any tips to share? Our end
> goal is to store the information in a way that makes it efficient to query,
> using a tool like Hive or Impala.
>
> Thanks,
> Kevin
>


Logistic Regression Standardization in ML

2016-10-10 Thread Cesar
I have a question regarding how the default standardization in the ML
version of the Logistic Regression (Spark 1.6) works.

Specifically about the next comments in the Spark Code:

/**
* Whether to standardize the training features before fitting the model.
* The coefficients of models will be always returned on the original scale,
* so it will be transparent for users. *Note that with/without
standardization,*
** the models should be always converged to the same solution when no
regularization*
** is applied.* In R's GLMNET package, the default behavior is true as well.
* Default is true.
*
* @group setParam
*/


Specifically I am having issues with understanding why the solution should
converge to the same weight values with/without standardization ?



Thanks !
-- 
Cesar Flores