[pyspark] java.lang.NoSuchMethodError: net.jpountz.util.Utils.checkRange error

2017-08-03 Thread kulas...@gmail.com
hi all


  i use spark-streaming 2.2.0 with python. and read data from 
kafka(2.11-0.10.0.0) cluster.
 folllow the  kafka Integration guide   
http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html.
 and i submit a python script with spark-submit --jars 
spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar hodor.py 
 the spark report a error message


"""
17/08/04 10:52:00 ERROR Utils: Uncaught exception in thread stdout 
writer for python
 java.lang.NoSuchMethodError: net.jpountz.util.Utils.checkRange([BII)V
at 
org.apache.kafka.common.message.KafkaLZ4BlockInputStream.read(KafkaLZ4BlockInputStream.java:176)
at java.io.FilterInputStream.read(FilterInputStream.java:107)
at 
kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
at 
kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
at 
kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
at scala.collection.immutable.Stream$.continually(Stream.scala:1279)
at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:67)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:764)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:214)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at 
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at 
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
"""


i think it maybe caused with lz4 version conflict. spark depend on 
net.jpountz.lz4 1.3.0 but kafka depend on net.jpountz.lz4 1.2.0.


Please guide. Thanks in advance.

Unsubscribe

2017-08-03 Thread Parijat Mazumdar
Unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PySpark Streaming S3 checkpointing

2017-08-03 Thread Riccardo Ferrari
Hi Steve,

Thank you for your answer, much appreciated.

Reading the code seems that:

   - Python StreamingContext.getOrCreate
   
calls
   Scala StreamingContextPythonHelper().tryRecoverFromCheckpoint(
   checkpointPath)
   

   - tryRecoverFromCheckpoint calls CheckpointReader.read(..., new
   SparkConf(), SparkHadoopUtil.get.conf,...)
   - SparkHadoopUtil.get.conf
   

   (when not using yarn) do:
  - sparkConf = new SparkConf(false).loadFromSystemProperties(true)
  - Configuration = newConfiguration(sparkConf)

I have to admit I have not tested (read: debug) it and might not be
completely accurate  (checkpointing is not the highest priority), however I
have the feeling I can not provide those properties via code because a new
configuration gets instantiated/read from system properties and whatever I
set to the current running context is ignored (or at least this happens in
python).

What do you (or any in the list) think?

Thanks,



On Wed, Aug 2, 2017 at 6:04 PM, Steve Loughran 
wrote:

>
> On 2 Aug 2017, at 10:34, Riccardo Ferrari  wrote:
>
> Hi list!
>
> I am working on a pyspark streaming job (ver 2.2.0) and I need to enable
> checkpointing. At high level my python script goes like this:
>
> class StreamingJob():
>
> def __init__(..):
> ...
>sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
>sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)
>
> def doJob(self):
>ssc = StreamingContext.getOrCreate('',  create ssc>)
>
> and I run it:
>
> myJob = StreamingJob(...)
> myJob.doJob()
>
> The problem is that StreamingContext.getOrCreate is not able to have
> access to hadoop configuration configured in the constructor and fails to
> load from checkpoint with
>
> "com.amazonaws.AmazonClientException: Unable to load AWS credentials from
> any provider in the chain"
>
> If I export AWS credentials to the system ENV before starting the script
> it works!
>
>
> Spark magically copies the env vars over for you when you launch a job
>
> I see the Scala version has an option to provide the hadoop configuration
> that is not available in python
>
> I don't have the whole Hadoop, just Spark, so I don't really want to
> configure hadoop's xmls and such
>
>
> when you set up the context, as in spark-defaults.conf
>
> spark.hadoop.fs.s3a.access.key=access key
> spark.hadoop.fs.s3a.secret.key=secret key
>
> Reminder: Do keep your secret key a secret, avoid checking it in to any
> form of revision control.
>


Re: SPARK Issue in Standalone cluster

2017-08-03 Thread Marco Mistroni
Hello
 my 2 cents here, hope it helps
If you want to just to play around with Spark, i'd leave Hadoop out, it's
an unnecessary dependency that you dont need for just running a python
script
Instead do the following:
- got to the root of our master / slave node. create a directory
/root/pyscripts
- place your csv file there as well as the python script
- run the script to replicate the whole directory  across the cluster (i
believe it's called copy-script.sh)
- then run your spark-submit , it will be something lke
./spark-submit /root/pyscripts/mysparkscripts.py
file:///root/pyscripts/tree_addhealth.csv 10 --master
spark://ip-172-31-44-155.us-west-2.compute.internal:7077
- in your python script, as part of your processing, write the parquet file
in directory /root/pyscripts

If you have an AWS account and you are versatile with that - you need to
setup bucket permissions etc - , you can just
- store your file in one of your S3 bucket
- create an EMR cluster
- connect to master or slave
- run your  scritp that reads from the s3 bucket and write to the same s3
bucket


Feel free to mail me privately, i have a working script i have used to test
some code on spark standalone cluster
hth










On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta 
wrote:

> Hi Steve,
>
> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>
> I am now going through the documentation (https://github.com/
> steveloughran/hadoop/blob/s3guard/HADOOP-13786-
> committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/
> hadoop-aws/s3a_committer_architecture.md) and it makes much much more
> sense now.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran 
> wrote:
>
>>
>> On 2 Aug 2017, at 20:05, Gourav Sengupta 
>> wrote:
>>
>> Hi Steve,
>>
>> I have written a sincere note of apology to everyone in a separate email.
>> I sincerely request your kind forgiveness before hand if anything does
>> sound impolite in my emails, in advance.
>>
>> Let me first start by thanking you.
>>
>> I know it looks like I formed all my opinion based on that document, but
>> that is not the case at all. If you or anyone tries to execute the code
>> that I have given then they will see what I mean. Code speaks louder and
>> better than words for me.
>>
>> So I am not saying you are wrong. I am asking verify and expecting
>> someone will be able to correct  a set of understanding that a moron like
>> me has gained after long hours of not having anything better to do.
>>
>>
>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
>> replication 2 and there is a HADOOP cluster of three nodes. All these nodes
>> have SPARK workers (executors) running in them.  Both are stored in the
>> following way:
>> -
>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>> | (worker1)   |  (worker2)|  (worker3)   |
>> | (master) | ||
>> -
>> | file1.csv  | | file1.csv |
>> -
>> ||  file2.csv  | file2.csv |
>> -
>> | file3.csv  |  file3.csv  |   |
>> -
>>
>>
>>
>>
>>
>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>> HDFS replication does not store the same file in all the nodes in the
>> cluster. So if I have three nodes and the replication is two then the same
>> file will be stored physically in two nodes in the cluster. Does that sound
>> right?
>>
>>
>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >
>> 128 then it will be broken up into blocks
>>
>> file1.cvs -> [block0001, block002, block0003]
>>
>> and each block will be replicated. With replication = 2 there will be two
>> copies of each block, but the file itself can span > 2 hosts.
>>
>>
>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>> If SPARK is trying to process to the records then I am expecting that
>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>> not be processing file2.csv and WORKER3 should not be processing file3.csv.
>> Because in case WORKER2 was trying to process file1.csv then it will
>> actually causing network transmission of the file unnecessarily.
>>
>>
>> Spark prefers to schedule work locally, so as to save on network traffic,
>> but it schedules for execution time over waiting for workers free on the
>> node with the data. IF a block is on nodes 2 and 3 but there is only a free
>> thread on node 1, then node 1 gets the work
>>
>> There's details on whether/how work across blocks takes place which I'm
>> avoiding. For now know those formats which are "splittable" will have work
>> scheduled by block. If you use 

SparkEventListener dropping events

2017-08-03 Thread Miles Crawford
We are seeing lots of stability problems with Spark 2.1.1 as a result of
dropped events.  We disabled the event log, which seemed to help, but many
events are still being dropped, as in the example log below.

I there any way for me to see what listener is backing up the queue? Is
there any workaround for this issue?


2017-08-03 04:13:29,852 ERROR org.apache.spark.scheduler.LiveListenerBus:
Dropping SparkListenerEvent because no remaining room in event queue. This
likely means one of the SparkListeners is too slow and cannot keep up with
the rate at which tasks are being started by the scheduler.
2017-08-03 04:13:29,853 WARN  org.apache.spark.scheduler.LiveListenerBus:
Dropped 1 SparkListenerEvents since Thu Jan 01 00:00:00 UTC 1970
2017-08-03 04:14:29,854 WARN  org.apache.spark.scheduler.LiveListenerBus:
Dropped 32738 SparkListenerEvents since Thu Aug 03 04:13:29 UTC 2017
2017-08-03 04:15:15,095 INFO
 org.allenai.s2.pipeline.spark.steps.LoadDaqPapers$: Finished in 127.572
seconds.
2017-08-03 04:15:15,095 INFO  org.allenai.s2.common.metrics.Metrics$:
Adding additional tags to all metrics and events: [pipeline, env:prod]
2017-08-03 04:15:15,149 INFO
 org.allenai.s2.pipeline.spark.steps.MergeSourcedPapers$: Computing
2017-08-03 04:15:29,853 WARN  org.apache.spark.scheduler.LiveListenerBus:
Dropped 28816 SparkListenerEvents since Thu Aug 03 04:14:29 UTC 2017
2017-08-03 04:16:29,868 WARN  org.apache.spark.scheduler.LiveListenerBus:
Dropped 18613 SparkListenerEvents since Thu Aug 03 04:15:29 UTC 2017
2017-08-03 04:17:29,868 WARN  org.apache.spark.scheduler.LiveListenerBus:
Dropped 52231 SparkListenerEvents since Thu Aug 03 04:16:29 UTC 2017
2017-08-03 04:18:29,868 WARN  org.apache.spark.scheduler.LiveListenerBus:
Dropped 16646 SparkListenerEvents since Thu Aug 03 04:17:29 UTC 2017
2017-08-03 04:19:29,868 WARN  org.apache.spark.scheduler.LiveListenerBus:
Dropped 19693 SparkListenerEvents since Thu Aug 03 04:18:29 UTC 2017


Re: DataSet creation not working Spark 1.6.0 , populating wrong data CDH 5.7.1

2017-08-03 Thread Rabin Banerjee
Unfortunately I cant use scala/python. Any solution in Java ?

On Thu, Aug 3, 2017 at 6:04 PM, Gourav Sengupta 
wrote:

> Guru,
>
> Anyways you can pick up SCALA or Python. Makes things way easier. The
> perfomance, maintainability, visibility, and minimum translation loss makes
> things better.
>
>
> Regards,
> Gourav
>
> On Thu, Aug 3, 2017 at 11:09 AM, Rabin Banerjee <
> dev.rabin.baner...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am trying to create a DataSet from DataFrame, where dataframe has been
>> created successfully, and using the same bean I am trying to create dataset.
>>
>> But when I am running it, Dataframe is created as expected. I am able to
>> print the content as well. But not the dataset. The DataSet is having only
>> one column populated with Person object blob and rest all are null.
>>
>> I am using Java Api not scala
>>
>> List data = new ArrayList<>();
>> Calendar cal = Calendar.getInstance();
>> long time = cal.getTime().getTime();
>> data.add(new Person( "0",0,100L, new Timestamp(time)));
>> data.add(new Person( "1",1,101L, new Timestamp(time)));
>>
>> JavaRDD rdd = (JavaRDD)jsc().parallelize(data);
>>
>> SQLContext sqlContext = SQLContext.getOrCreate(jsc().sc());
>>
>> DataFrame df = sqlContext.createDataFrame(rdd,Person.class);
>>
>>
>> df.show();
>> Encoder es = Encoders.bean(Person.class);
>>
>> Dataset dsi = new Dataset<>(sqlContext,df.logicalPlan(),es);
>>
>> //Dataset dsi = df.as(es);
>>
>>
>> dsi.printSchema();
>> dsi.show()
>>
>> And
>>
>> public class Person  implements Serializable{
>>
>> String id;
>> Integer partition;
>> Long offset;
>> Timestamp eventTime;
>>
>> public Person() {
>> }
>>
>> public Person(String id, Integer partition, Long offset, Timestamp 
>> eventTime) {
>> this.id = id;
>> this.partition = partition;
>> this.offset = offset;
>> this.eventTime = eventTime;
>> }
>>
>> public String getId() {
>> return id;
>> }
>>
>> public void setId(String id) {
>> this.id = id;
>> }
>>
>> public Integer getPartition() {
>> return partition;
>> }
>>
>> public void setPartition(Integer partition) {
>> this.partition = partition;
>> }
>>
>> public Long getOffset() {
>> return offset;
>> }
>>
>> public void setOffset(Long offset) {
>>
>>
>> this.offset = offset;
>> }
>>
>> public Timestamp getEventTime() {
>> return eventTime;
>> }
>>
>> public void setEventTime(Timestamp eventTime) {
>> this.eventTime = eventTime;
>> }
>> }
>>
>>
>


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-03 Thread Chetan Khatri
Thanks Holden !


On Thu, Aug 3, 2017 at 4:02 AM, Holden Karau  wrote:

> The memory overhead is based less on the total amount of data and more on
> what you end up doing with the data (e.g. if your doing a lot of off-heap
> processing or using Python you need to increase it). Honestly most people
> find this number for their job "experimentally" (e.g. they try a few
> different things).
>
> On Wed, Aug 2, 2017 at 1:52 PM, Chetan Khatri  > wrote:
>
>> Ryan,
>> Thank you for reply.
>>
>> For 2 TB of Data what should be the value of
>> spark.yarn.executor.memoryOverhead = ?
>>
>> with regards to this - i see issue at spark
>> https://issues.apache.org/jira/browse/SPARK-18787 , not sure whether it
>> works or not at Spark 2.0.1  !
>>
>> can you elaborate more for spark.memory.fraction setting.
>>
>> number of partitions = 674
>> Cluster: 455 GB total memory, VCores: 288, Nodes: 17
>> Given / tried memory config: executor-mem = 16g, num-executor=10,
>> executor cores=6, driver mem=4g
>>
>> spark.default.parallelism=1000
>> spark.sql.shuffle.partitions=1000
>> spark.yarn.executor.memoryOverhead=2048
>> spark.shuffle.io.preferDirectBufs=false
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue  wrote:
>>
>>> Chetan,
>>>
>>> When you're writing to a partitioned table, you want to use a shuffle to
>>> avoid the situation where each task has to write to every partition. You
>>> can do that either by adding a repartition by your table's partition keys,
>>> or by adding an order by with the partition keys and then columns you
>>> normally use to filter when reading the table. I generally recommend the
>>> second approach because it handles skew and prepares the data for more
>>> efficient reads.
>>>
>>> If that doesn't help, then you should look at your memory settings. When
>>> you're getting killed by YARN, you should consider setting `
>>> spark.shuffle.io.preferDirectBufs=false` so you use less off-heap
>>> memory that the JVM doesn't account for. That is usually an easier fix than
>>> increasing the memory overhead. Also, when you set executor memory, always
>>> change spark.memory.fraction to ensure the memory you're adding is used
>>> where it is needed. If your memory fraction is the default 60%, then 60% of
>>> the memory will be used for Spark execution, not reserved whatever is
>>> consuming it and causing the OOM. (If Spark's memory is too low, you'll see
>>> other problems like spilling too much to disk.)
>>>
>>> rb
>>>
>>> On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Can anyone please guide me with above issue.


 On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <
 chetan.opensou...@gmail.com> wrote:

> Hello Spark Users,
>
> I have Hbase table reading and writing to Hive managed table where i
> applied partitioning by date column which worked fine but it has generate
> more number of files in almost 700 partitions but i wanted to use
> reparation to reduce File I/O by reducing number of files inside each
> partition.
>
> *But i ended up with below exception:*
>
> ExecutorLostFailure (executor 11 exited caused by one of the running
> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
> memoryOverhead.
>
> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>
> Do you think below setting can help me to overcome above issue:
>
> spark.default.parellism=1000
> spark.sql.shuffle.partitions=1000
>
> Because default max number of partitions are 1000.
>
>
>

>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


mapPartitioningWithIndex in Dataframe

2017-08-03 Thread Lalwani, Jayesh
Are there any plans to add mapPartitioningWithIndex in the Dataframe API? Or is 
there any way to implement my own mapPartitionWithIndex for a Dataframe?

I am implementing something which is logically similar to the randomSplit 
function. In 2.1, randomSplit internally does df.mapPartitionWithIndex and 
assigns a different seed for every partition by adding the partition’s index to 
the seed. I want to get  a partition specific seed too.

The problem is rdd.mapPartitionWithIndex doesn’t work in streaming. 
df.mapPartition works, but I don’t get index.

Is there a way to extend Spark to add mapPartitionWithIndex at the Dataframe 
level ?
I was digging into the 2.2 code a bit and it looks like in 2.2, all the 
Dataframe apis have been changed to be based around SparkStrategy. I couldn’t 
figure out  how I can add my own custom strategy. Is there any documentation 
around this? If it makes sense to add this to Spark, I would be excited to make 
a contribution.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: DataSet creation not working Spark 1.6.0 , populating wrong data CDH 5.7.1

2017-08-03 Thread Abdallah Mahmoud
Unsubscribe me Please


 Sent with Mailtrack


*Abdallah Mahmoud Zidan*
*Junior Data Scientist & Software Developer*
*(+20) 01027228807*


On 3 August 2017 at 14:39, Jörn Franke  wrote:

> You need to create a schema for person.
> https://spark.apache.org/docs/latest/sql-programming-guide.
> html#programmatically-specifying-the-schema
>
> On 3. Aug 2017, at 12:09, Rabin Banerjee 
> wrote:
>
> Hi All,
>
> I am trying to create a DataSet from DataFrame, where dataframe has been
> created successfully, and using the same bean I am trying to create dataset.
>
> But when I am running it, Dataframe is created as expected. I am able to
> print the content as well. But not the dataset. The DataSet is having only
> one column populated with Person object blob and rest all are null.
>
> I am using Java Api not scala
>
> List data = new ArrayList<>();
> Calendar cal = Calendar.getInstance();
> long time = cal.getTime().getTime();
> data.add(new Person( "0",0,100L, new Timestamp(time)));
> data.add(new Person( "1",1,101L, new Timestamp(time)));
>
> JavaRDD rdd = (JavaRDD)jsc().parallelize(data);
>
> SQLContext sqlContext = SQLContext.getOrCreate(jsc().sc());
>
> DataFrame df = sqlContext.createDataFrame(rdd,Person.class);
>
>
> df.show();
> Encoder es = Encoders.bean(Person.class);
>
> Dataset dsi = new Dataset<>(sqlContext,df.logicalPlan(),es);
>
> //Dataset dsi = df.as(es);
>
>
> dsi.printSchema();
> dsi.show()
>
> And
>
> public class Person  implements Serializable{
>
> String id;
> Integer partition;
> Long offset;
> Timestamp eventTime;
>
> public Person() {
> }
>
> public Person(String id, Integer partition, Long offset, Timestamp eventTime) 
> {
> this.id = id;
> this.partition = partition;
> this.offset = offset;
> this.eventTime = eventTime;
> }
>
> public String getId() {
> return id;
> }
>
> public void setId(String id) {
> this.id = id;
> }
>
> public Integer getPartition() {
> return partition;
> }
>
> public void setPartition(Integer partition) {
> this.partition = partition;
> }
>
> public Long getOffset() {
> return offset;
> }
>
> public void setOffset(Long offset) {
>
>
> this.offset = offset;
> }
>
> public Timestamp getEventTime() {
> return eventTime;
> }
>
> public void setEventTime(Timestamp eventTime) {
> this.eventTime = eventTime;
> }
> }
>
>


Re: DataSet creation not working Spark 1.6.0 , populating wrong data CDH 5.7.1

2017-08-03 Thread Jörn Franke
You need to create a schema for person. 
https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

> On 3. Aug 2017, at 12:09, Rabin Banerjee  wrote:
> 
> Hi All,
> 
> I am trying to create a DataSet from DataFrame, where dataframe has been 
> created successfully, and using the same bean I am trying to create dataset.
> 
> But when I am running it, Dataframe is created as expected. I am able to 
> print the content as well. But not the dataset. The DataSet is having only 
> one column populated with Person object blob and rest all are null.
> 
> I am using Java Api not scala
> 
> List data = new ArrayList<>();
> Calendar cal = Calendar.getInstance();
> long time = cal.getTime().getTime();
> data.add(new Person( "0",0,100L, new Timestamp(time)));
> data.add(new Person( "1",1,101L, new Timestamp(time)));
> 
> JavaRDD rdd = (JavaRDD)jsc().parallelize(data);
> 
> SQLContext sqlContext = SQLContext.getOrCreate(jsc().sc());
> 
> DataFrame df = sqlContext.createDataFrame(rdd,Person.class);
> 
> 
> df.show();
> Encoder es = Encoders.bean(Person.class);
> 
> Dataset dsi = new Dataset<>(sqlContext,df.logicalPlan(),es);
> 
> //Dataset dsi = df.as(es);
> 
> 
> dsi.printSchema();
> dsi.show()
> And
> 
> public class Person  implements Serializable{
> 
> String id;
> Integer partition;
> Long offset;
> Timestamp eventTime;
> 
> public Person() {
> }
> 
> public Person(String id, Integer partition, Long offset, Timestamp eventTime) 
> {
> this.id = id;
> this.partition = partition;
> this.offset = offset;
> this.eventTime = eventTime;
> }
> 
> public String getId() {
> return id;
> }
> 
> public void setId(String id) {
> this.id = id;
> }
> 
> public Integer getPartition() {
> return partition;
> }
> 
> public void setPartition(Integer partition) {
> this.partition = partition;
> }
> 
> public Long getOffset() {
> return offset;
> }
> 
> public void setOffset(Long offset) {
> 
> this.offset = offset;
> }
> 
> public Timestamp getEventTime() {
> return eventTime;
> }
> 
> public void setEventTime(Timestamp eventTime) {
> this.eventTime = eventTime;
> }
> }


Re: DataSet creation not working Spark 1.6.0 , populating wrong data CDH 5.7.1

2017-08-03 Thread Gourav Sengupta
Guru,

Anyways you can pick up SCALA or Python. Makes things way easier. The
perfomance, maintainability, visibility, and minimum translation loss makes
things better.


Regards,
Gourav

On Thu, Aug 3, 2017 at 11:09 AM, Rabin Banerjee <
dev.rabin.baner...@gmail.com> wrote:

> Hi All,
>
> I am trying to create a DataSet from DataFrame, where dataframe has been
> created successfully, and using the same bean I am trying to create dataset.
>
> But when I am running it, Dataframe is created as expected. I am able to
> print the content as well. But not the dataset. The DataSet is having only
> one column populated with Person object blob and rest all are null.
>
> I am using Java Api not scala
>
> List data = new ArrayList<>();
> Calendar cal = Calendar.getInstance();
> long time = cal.getTime().getTime();
> data.add(new Person( "0",0,100L, new Timestamp(time)));
> data.add(new Person( "1",1,101L, new Timestamp(time)));
>
> JavaRDD rdd = (JavaRDD)jsc().parallelize(data);
>
> SQLContext sqlContext = SQLContext.getOrCreate(jsc().sc());
>
> DataFrame df = sqlContext.createDataFrame(rdd,Person.class);
>
>
> df.show();
> Encoder es = Encoders.bean(Person.class);
>
> Dataset dsi = new Dataset<>(sqlContext,df.logicalPlan(),es);
>
> //Dataset dsi = df.as(es);
>
>
> dsi.printSchema();
> dsi.show()
>
> And
>
> public class Person  implements Serializable{
>
> String id;
> Integer partition;
> Long offset;
> Timestamp eventTime;
>
> public Person() {
> }
>
> public Person(String id, Integer partition, Long offset, Timestamp eventTime) 
> {
> this.id = id;
> this.partition = partition;
> this.offset = offset;
> this.eventTime = eventTime;
> }
>
> public String getId() {
> return id;
> }
>
> public void setId(String id) {
> this.id = id;
> }
>
> public Integer getPartition() {
> return partition;
> }
>
> public void setPartition(Integer partition) {
> this.partition = partition;
> }
>
> public Long getOffset() {
> return offset;
> }
>
> public void setOffset(Long offset) {
>
>
> this.offset = offset;
> }
>
> public Timestamp getEventTime() {
> return eventTime;
> }
>
> public void setEventTime(Timestamp eventTime) {
> this.eventTime = eventTime;
> }
> }
>
>


DataSet creation not working Spark 1.6.0 , populating wrong data CDH 5.7.1

2017-08-03 Thread Rabin Banerjee
Hi All,

I am trying to create a DataSet from DataFrame, where dataframe has been
created successfully, and using the same bean I am trying to create dataset.

But when I am running it, Dataframe is created as expected. I am able to
print the content as well. But not the dataset. The DataSet is having only
one column populated with Person object blob and rest all are null.

I am using Java Api not scala

List data = new ArrayList<>();
Calendar cal = Calendar.getInstance();
long time = cal.getTime().getTime();
data.add(new Person( "0",0,100L, new Timestamp(time)));
data.add(new Person( "1",1,101L, new Timestamp(time)));

JavaRDD rdd = (JavaRDD)jsc().parallelize(data);

SQLContext sqlContext = SQLContext.getOrCreate(jsc().sc());

DataFrame df = sqlContext.createDataFrame(rdd,Person.class);


df.show();
Encoder es = Encoders.bean(Person.class);

Dataset dsi = new Dataset<>(sqlContext,df.logicalPlan(),es);

//Dataset dsi = df.as(es);


dsi.printSchema();
dsi.show()

And

public class Person  implements Serializable{

String id;
Integer partition;
Long offset;
Timestamp eventTime;

public Person() {
}

public Person(String id, Integer partition, Long offset, Timestamp eventTime) {
this.id = id;
this.partition = partition;
this.offset = offset;
this.eventTime = eventTime;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public Integer getPartition() {
return partition;
}

public void setPartition(Integer partition) {
this.partition = partition;
}

public Long getOffset() {
return offset;
}

public void setOffset(Long offset) {


this.offset = offset;
}

public Timestamp getEventTime() {
return eventTime;
}

public void setEventTime(Timestamp eventTime) {
this.eventTime = eventTime;
}
}


Re: Quick one on evaluation

2017-08-03 Thread Daniel Darabos
On Wed, Aug 2, 2017 at 2:16 PM, Jean Georges Perrin  wrote:

> Hi Sparkians,
>
> I understand the lazy evaluation mechanism with transformations and
> actions. My question is simpler: 1) are show() and/or printSchema()
> actions? I would assume so...
>

show() is an action (it prints data) but printSchema() is not an action.
Spark can tell you the schema of the result without computing the result.

and optional question: 2) is there a way to know if there are
> transformations "pending"?
>

There are always transformations pending :). An RDD or DataFrame is a
series of pending transformations. If you say val df =
spark.read.csv("foo.csv"), that is a pending transformation. Even
spark.emptyDataFrame is best understood as a pending transformation: it
does not do anything on the cluster, but records locally what it will have
to do on the cluster.


Re: SPARK Issue in Standalone cluster

2017-08-03 Thread Gourav Sengupta
Hi Steve,

I love you mate, thanks a ton once again for ACTUALLY RESPONDING.

I am now going through the documentation (
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md)
and it makes much much more sense now.

Regards,
Gourav Sengupta

On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran 
wrote:

>
> On 2 Aug 2017, at 20:05, Gourav Sengupta 
> wrote:
>
> Hi Steve,
>
> I have written a sincere note of apology to everyone in a separate email.
> I sincerely request your kind forgiveness before hand if anything does
> sound impolite in my emails, in advance.
>
> Let me first start by thanking you.
>
> I know it looks like I formed all my opinion based on that document, but
> that is not the case at all. If you or anyone tries to execute the code
> that I have given then they will see what I mean. Code speaks louder and
> better than words for me.
>
> So I am not saying you are wrong. I am asking verify and expecting someone
> will be able to correct  a set of understanding that a moron like me has
> gained after long hours of not having anything better to do.
>
>
> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
> replication 2 and there is a HADOOP cluster of three nodes. All these nodes
> have SPARK workers (executors) running in them.  Both are stored in the
> following way:
> -
> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
> | (worker1)   |  (worker2)|  (worker3)   |
> | (master) | ||
> -
> | file1.csv  | | file1.csv |
> -
> ||  file2.csv  | file2.csv |
> -
> | file3.csv  |  file3.csv  |   |
> -
>
>
>
>
>
> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
> HDFS replication does not store the same file in all the nodes in the
> cluster. So if I have three nodes and the replication is two then the same
> file will be stored physically in two nodes in the cluster. Does that sound
> right?
>
>
> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >
> 128 then it will be broken up into blocks
>
> file1.cvs -> [block0001, block002, block0003]
>
> and each block will be replicated. With replication = 2 there will be two
> copies of each block, but the file itself can span > 2 hosts.
>
>
> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
> If SPARK is trying to process to the records then I am expecting that
> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
> not be processing file2.csv and WORKER3 should not be processing file3.csv.
> Because in case WORKER2 was trying to process file1.csv then it will
> actually causing network transmission of the file unnecessarily.
>
>
> Spark prefers to schedule work locally, so as to save on network traffic,
> but it schedules for execution time over waiting for workers free on the
> node with the data. IF a block is on nodes 2 and 3 but there is only a free
> thread on node 1, then node 1 gets the work
>
> There's details on whether/how work across blocks takes place which I'm
> avoiding. For now know those formats which are "splittable" will have work
> scheduled by block. If you use Parquet/ORC/avro for your data and compress
> with snappy, it will be split. This gives you maximum performance as >1
> thread can work on different blocks. That is, if file1 is split into three
> blocks, three worker threads can process it.
>
>
> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
> THIS):
> if WORKER 2 is not processing file1.csv then how does it matter whether
> the file is there or not at all in the system? Should not SPARK just ask
> the workers to process the files which are avialable in the worker nodes?
> In case both WORKER2 and WORKER3 fails and are not available then file2.csv
> will not be processed at all.
>
>
> locality is best-effort, not guaranteed.
>
>
> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
> EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
> take more than 13 mins to set up the cluster and run the code).
>
> Once you execute the code then you will find that:
> 1.  if the path starts with file:/// while reading back then there is no
> error reported, but the number of records reported back are only those
> records in the worker which also has the server.
> 2. also you will notice that once you cache the file before writing the
> partitions are ditributed nicely across the workers, and while writing
> back, the dataframe partitions does write properly to the worker node in
> 

Re: SPARK Issue in Standalone cluster

2017-08-03 Thread Steve Loughran

On 2 Aug 2017, at 20:05, Gourav Sengupta 
> wrote:

Hi Steve,

I have written a sincere note of apology to everyone in a separate email. I 
sincerely request your kind forgiveness before hand if anything does sound 
impolite in my emails, in advance.

Let me first start by thanking you.

I know it looks like I formed all my opinion based on that document, but that 
is not the case at all. If you or anyone tries to execute the code that I have 
given then they will see what I mean. Code speaks louder and better than words 
for me.

So I am not saying you are wrong. I am asking verify and expecting someone will 
be able to correct  a set of understanding that a moron like me has gained 
after long hours of not having anything better to do.


SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with 
replication 2 and there is a HADOOP cluster of three nodes. All these nodes 
have SPARK workers (executors) running in them.  Both are stored in the 
following way:
-
| SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
| (worker1)   |  (worker2)|  (worker3)   |
| (master) | ||
-
| file1.csv  | | file1.csv |
-
||  file2.csv  | file2.csv |
-
| file3.csv  |  file3.csv  |   |
-





CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
HDFS replication does not store the same file in all the nodes in the cluster. 
So if I have three nodes and the replication is two then the same file will be 
stored physically in two nodes in the cluster. Does that sound right?


HDFS breaks files up into blocks (default = 128MB). If a .csv file is > 128 
then it will be broken up into blocks

file1.cvs -> [block0001, block002, block0003]

and each block will be replicated. With replication = 2 there will be two 
copies of each block, but the file itself can span > 2 hosts.


ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
If SPARK is trying to process to the records then I am expecting that WORKER2 
should not be processing file1.csv, and similary WORKER 1 should not be 
processing file2.csv and WORKER3 should not be processing file3.csv. Because in 
case WORKER2 was trying to process file1.csv then it will actually causing 
network transmission of the file unnecessarily.


Spark prefers to schedule work locally, so as to save on network traffic, but 
it schedules for execution time over waiting for workers free on the node with 
the data. IF a block is on nodes 2 and 3 but there is only a free thread on 
node 1, then node 1 gets the work

There's details on whether/how work across blocks takes place which I'm 
avoiding. For now know those formats which are "splittable" will have work 
scheduled by block. If you use Parquet/ORC/avro for your data and compress with 
snappy, it will be split. This gives you maximum performance as >1 thread can 
work on different blocks. That is, if file1 is split into three blocks, three 
worker threads can process it.


ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY THIS):
if WORKER 2 is not processing file1.csv then how does it matter whether the 
file is there or not at all in the system? Should not SPARK just ask the 
workers to process the files which are avialable in the worker nodes? In case 
both WORKER2 and WORKER3 fails and are not available then file2.csv will not be 
processed at all.


locality is best-effort, not guaranteed.


ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE EXECUTED 
(Its been pointed out that I am learning SPARK, and even I did not take more 
than 13 mins to set up the cluster and run the code).

Once you execute the code then you will find that:
1.  if the path starts with file:/// while reading back then there is no error 
reported, but the number of records reported back are only those records in the 
worker which also has the server.
2. also you will notice that once you cache the file before writing the 
partitions are ditributed nicely across the workers, and while writing back, 
the dataframe partitions does write properly to the worker node in the Master, 
but the workers in the other system have the files written in _temporary folder 
which does not get copied back to the main folder. Inspite of this the job is 
not reported as failed in SPARK.

This gets into the "commit protocol". You don't want to know all the dirty 
details (*) but essentially its this

1. Every worker writes its output to a directory under the destination 
directory, something like 
'$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID'
2. it is the spark driver which "commits" the job by moving the