Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-01 Thread Koert Kuipers
this works for dataframes with spark 2.3 by changing a global setting, and
will be configurable per write in 2.4
see:
https://issues.apache.org/jira/browse/SPARK-20236
https://issues.apache.org/jira/browse/SPARK-24860

On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel  wrote:

> Hi Peay,
>
> Have you find better solution yet? I am having same issue.
>
> Following says it works with spark 2.1 onward but only when you use
> sqlContext and not Dataframe
> https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-
> 2e2b818a007a
>
> Thanks,
> Nirav
>
> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh  wrote:
>
>> If your processing task inherently processes input data by month you
>> may want to "manually" partition the output data by month as well as
>> by day, that is to save it with a file name including the given month,
>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>> overwrite mode with each month partition. Hope this could be of some
>> help.
>>
>> --
>> Pavel Knoblokh
>>
>> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
>> > Hello,
>> >
>> > I am trying to use
>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
>> > dataset while splitting by day.
>> >
>> > I would like to run a Spark job  to process, e.g., a month:
>> > dataset.parquet/day=2017-01-01/...
>> > ...
>> >
>> > and then run another Spark job to add another month using the same
>> folder
>> > structure, getting me
>> > dataset.parquet/day=2017-01-01/
>> > ...
>> > dataset.parquet/day=2017-02-01/
>> > ...
>> >
>> > However:
>> > - with save mode "overwrite", when I process the second month, all of
>> > dataset.parquet/ gets removed and I lose whatever was already computed
>> for
>> > the previous month.
>> > - with save mode "append", then I can't get idempotence: if I run the
>> job to
>> > process a given month twice, I'll get duplicate data in all the
>> subfolders
>> > for that month.
>> >
>> > Is there a way to do "append in terms of the subfolders from
>> partitionBy,
>> > but overwrite within each such partitions? Any help would be
>> appreciated.
>> >
>> > Thanks!
>>
>>
>>
>> --
>> Pavel Knoblokh
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 


Spark Memory Requirement

2018-08-01 Thread msbreuer
Many threads talk about memory requirements and most often answers are,
to add more memory to spark. My understanding of spark is a scaleable
anyltics engine, which is able to utilize assigned resources and to
calculate the correct answer. So assigning core and memory may speedup
an task.

I am using spark in local mode for calculations. My boundary is the
local workstation and its resources. Large disk, 16gb of ram and 4 cores
are the hardware limit. I decided to use spark with 2 cores, 6gb of
memory and several dozens of disk space. This setup with spark default
(1gb driver / 1gb executor) should satisfy recommended requirements.
Question is, what are the limits to job processing in respect to this
requirements?

// base to generate huge data
List list = new ArrayList<>();
for (int val = 1; val < 1; val++) {
int valueOf = Integer.valueOf(val);
list.add(valueOf);
}
// create simple rdd of int
JavaRDD rdd = sc.parallelize(list,200);

An array of 10.000 Integers should fit to memory (10k x 4byte + some
overhead is 40-50kb in heap)

JavaRDD rowRDD =
rdd.map(value -> RowFactory.create(String.valueOf(value),
createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
;

Becomes more interesting. Map operation creates much larger object. But
spark will process in in partitions. The sample uses 200 partitions with
50 rows. A row object is about 2mb large and data covers 100mb of
memory. Probably some more overhead because of rdd and mapping overhead.
Let's assume 200mb per partition.
Two core will run 2 tasks in parallel and consume 2x200mb of memory.
Probably some queued tasks take some more memory. Spark will spill data
to disk, even if no explicit storage level was given.

What would persist(StorageLevel.MEMORY_AND_DISK) change here?

StructType type = new StructType();
type = type
.add("c1", DataTypes.StringType)
.add( "c2", DataTypes.StringType );

Dataset df = spark.createDataFrame(rowRDD, type);
df = df.sort( col("c2").asc() );

Next a sort operation is defined over data. Data is processed per
partition. I assume spark will sort data per partition and merge the
results. I worked with hadoop map files and used its merging
capabilities, so merging a set of many but sorted map files is easy and
not memory intensive. I expect spark work in same way. Okay, because of
distributed concept of spark part result are exchanged between workers,
this causes some protocol overhead in memory. Combining contents of
several partitions is named shuffling, right?

I calculated this example several times and expected the code to work.
But application always runs out of memory. Is my formula wrong? Are
there aspect I forgot? Is there something I did wrong? Which parameters
should be corrected to avoid out-of-memory errors? Are assumptions correct?

Regards,
Markus

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Split a row into multiple rows Java

2018-08-01 Thread Anton Puzanov
you can always use array+explode, I don't know if its the most
elegant/optimal solution (would be happy to hear from the experts)

code example:
//create data

Dataset test= spark.createDataFrame(Arrays.asList(new
InternalData("bob", "b1", 1,2,3),
new InternalData("alive", "c1", 3,4,6),
new InternalData("eve", "e1", 7,8,9)
), InternalData.class);

+-+-++++
| name|otherName|val1|val2|val3|
+-+-++++
|  bob|   b1|   1|   2|   3|
|alive|   c1|   3|   4|   6|
|  eve|   e1|   7|   8|   9|
+-+-++++

Dataset expandedTest = test.selectExpr("name", "otherName",
"explode(array(val1, val2, val3)) as time");
expandedTest.show();
+-+-++
| name|otherName|time|
+-+-++
|  bob|   b1|   1|
|  bob|   b1|   2|
|  bob|   b1|   3|
|alive|   c1|   3|
|alive|   c1|   4|
|alive|   c1|   6|
|  eve|   e1|   7|
|  eve|   e1|   8|
|  eve|   e1|   9|
+-+-++


On Wed, Aug 1, 2018 at 11:05 PM, nookala  wrote:

> Pivot seems to do the opposite of what I want, convert rows to columns.
>
> I was able to get this done in python, but would like to do this in Java
>
> idfNew = idf.rdd.flatMap((lambda row: [(row.Name, row.Id, row.Date,
> "0100",row.0100),(row.Name, row.Id, row.Date, "0200",row.0200),row.Name,
> row.Id, row.Date, "0300",row.0300),row.Name, row.Id, row.Date,
> "0400",row.0400)])).toDF()
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


RE: Split a row into multiple rows Java

2018-08-01 Thread nookala
Pivot seems to do the opposite of what I want, convert rows to columns.

I was able to get this done in python, but would like to do this in Java

idfNew = idf.rdd.flatMap((lambda row: [(row.Name, row.Id, row.Date,
"0100",row.0100),(row.Name, row.Id, row.Date, "0200",row.0200),row.Name,
row.Id, row.Date, "0300",row.0300),row.Name, row.Id, row.Date,
"0400",row.0400)])).toDF()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Overwrite only specific partition with hive dynamic partitioning

2018-08-01 Thread Nirav Patel
Hi,

I have a hive partition table created using sparkSession. I would like to
insert/overwrite Dataframe data to specific set of partition without
loosing any other partition. In each run I have to update Set of partitions
not just one.

e.g. I have dataframe with bid=1, bid=2, bid=3 in first time and I can
write it  by using

`df.write.mode(SaveMode.Overwrite).partitionBy("bid").parquet(TableBase
Location)`


It generates dirs: bid=1, bid=2, bid=3  inside TableBaseLocation

But next time when I have a dataframe with  bid=1, bid=4 and use same code
above it removes bid=2 and bid=3. in other words I dont get idempotency.

I tried SaveMode.append but that creates duplicate data inside "bid=1"


I read
https://issues.apache.org/jira/browse/SPARK-18183

With that approach it seems like I may have to updated multiple partition
manually for each input partition. That seems like lot of work on every
update. Is there a better way for this?

Can this fix be apply to dataframe based approach as well?

Thanks

-- 


 

 
   
   
      



Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-01 Thread Nirav Patel
Hi Peay,

Have you find better solution yet? I am having same issue.

Following says it works with spark 2.1 onward but only when you use
sqlContext and not Dataframe
https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-2e2b818a007a

Thanks,
Nirav

On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh  wrote:

> If your processing task inherently processes input data by month you
> may want to "manually" partition the output data by month as well as
> by day, that is to save it with a file name including the given month,
> i.e. "dataset.parquet/month=01". Then you will be able to use the
> overwrite mode with each month partition. Hope this could be of some
> help.
>
> --
> Pavel Knoblokh
>
> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
> > Hello,
> >
> > I am trying to use
> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> > dataset while splitting by day.
> >
> > I would like to run a Spark job  to process, e.g., a month:
> > dataset.parquet/day=2017-01-01/...
> > ...
> >
> > and then run another Spark job to add another month using the same folder
> > structure, getting me
> > dataset.parquet/day=2017-01-01/
> > ...
> > dataset.parquet/day=2017-02-01/
> > ...
> >
> > However:
> > - with save mode "overwrite", when I process the second month, all of
> > dataset.parquet/ gets removed and I lose whatever was already computed
> for
> > the previous month.
> > - with save mode "append", then I can't get idempotence: if I run the
> job to
> > process a given month twice, I'll get duplicate data in all the
> subfolders
> > for that month.
> >
> > Is there a way to do "append in terms of the subfolders from partitionBy,
> > but overwrite within each such partitions? Any help would be appreciated.
> >
> > Thanks!
>
>
>
> --
> Pavel Knoblokh
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


 

 
   
   
      



Re: How to add a new source to exsting struct streaming application, like a kafka source

2018-08-01 Thread David Rosenstrauch




On 08/01/2018 12:36 PM, Robb Greathouse wrote:

How to unsubscribe?


List-Unsubscribe: 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to add a new source to exsting struct streaming application, like a kafka source

2018-08-01 Thread Robb Greathouse
How to unsubscribe?

On Mon, Jul 30, 2018 at 3:13 AM 杨浩  wrote:

> How to add a new source to exsting struct streaming application, like a
> kafka source
>


-- 
Robb Greathouse
Middleware BU
505-507-4906


Data quality measurement for streaming data with apache spark

2018-08-01 Thread Uttam
Hello,

I have very general question about Apache Spark. I want to know if it is
possible(and where to start, if possible) to implement a data quality
measurement prototype for streaming data using Apache Spark. Let's say I
want to work on Timeliness or Completeness as a data quality metrics, is
similar work already done using spark? Are there other frameworks which are
better designed for this use case? 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to use window method with direct kafka streaming ?

2018-08-01 Thread fat.wei
Hi everyone,

I have the following scenario , and I tried to use window method with direct 
kafka streaming. The program can run, but doesn't run right!

1. The data is stored in kafka.
2. Every single item of the data has its primary key.
3. Every single item of the data will be split into multipe parts,and these 
parts will arrive at kafka in order.

Here's my sample code:

 
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(20));
 
 JavaPairInputDStream messages = 
KafkaUtils.createDirectStream(jssc, String.class, String.class,
 StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
 messages.window(Durations.seconds(60), Durations.seconds(40)).print();

I couldn't get the data of  RDD@40 ,when I tried to print the data of windowed 
RDD@80.

Can I have some  suggestions!
 



How to make Yarn dynamically allocate resources for Spark

2018-08-01 Thread Anton Puzanov
Hi everyone,

have a cluster managed with Yarn and runs Spark jobs, the components were
installed using Ambari (2.6.3.0-235). I have 6 hosts each with 6 cores. I
use Fair scheduler

I want Yarn to automatically add/remove executor cores, but no matter what
I do it doesn't work

Relevant Spark configuration (configured in Ambari):

spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.initialExecutors 6 (has no effect - starts with 2)
spark.dynamicAllocation.maxExecutors 10
spark.dynamicAllocation.minExecutors 1
spark.scheduler.mode FAIR
spark.shuffle.service.enabled true
SPARK_EXECUTOR_MEMORY="36G"

Relevant Yarn configuration (configured in Ambari):
yarn.nodemanager.aux-services mapreduce_shuffle,spark_shuffle,spark2_shuffle
YARN Java heap size 4096
yarn.resourcemanager.scheduler.class
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
yarn.scheduler.fair.preemption true
yarn.nodemanager.aux-services.spark2_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService
yarn.nodemanager.aux-services.spark2_shuffle.classpath
{{stack_root}}/${hdp.version}/spark2/aux/*
yarn.nodemanager.aux-services.spark_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService
yarn.nodemanager.aux-services.spark_shuffle.classpath
{{stack_root}}/${hdp.version}/spark/aux/*
Minimum Container Size (VCores) 0
Maximum Container Size (VCores) 12
Number of virtual cores 12


Also I followed Dynamic resource allocation

and passed all the steps to configure external shuffle service, I
copied the yarn-shuffle jar:

cp /usr/hdp/2.6.3.0-235/spark/aux/spark-2.2.0.2.6.3.0-235-yarn-shuffle.jar
/usr/hdp/2.6.3.0-235/hadoop-yarn/lib/

I see only 3 cores are allocated to the application (deafult executors
is 2 so I guess its driver+2,
Although many tasks are pending.

If it it relevant, I use Jupyter Notebook and findspark to connect to
the cluster:
import findspark
findspark.init()
spark = SparkSession.builder.appName("internal-external2").getOrCreate()


I would really appreciate any suggestion/help, there is no manual on
that topic I didn't try.
thx a lot,
Anton


How to make Yarn dynamically allocate resources for Spark

2018-08-01 Thread Anton Puzanov
Hi everyone,

have a cluster managed with Yarn and runs Spark jobs, the components were
installed using Ambari (2.6.3.0-235). I have 6 hosts each with 6 cores. I
use Fair scheduler

I want Yarn to automatically add/remove executor cores, but no matter what
I do it doesn't work

Relevant Spark configuration (configured in Ambari):

spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.initialExecutors 6 (has no effect - starts with 2)
spark.dynamicAllocation.maxExecutors 10
spark.dynamicAllocation.minExecutors 1
spark.scheduler.mode FAIR
spark.shuffle.service.enabled true
SPARK_EXECUTOR_MEMORY="36G"

Relevant Yarn configuration (configured in Ambari):
yarn.nodemanager.aux-services mapreduce_shuffle,spark_shuffle,spark2_shuffle
YARN Java heap size 4096
yarn.resourcemanager.scheduler.class
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
yarn.scheduler.fair.preemption true
yarn.nodemanager.aux-services.spark2_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService
yarn.nodemanager.aux-services.spark2_shuffle.classpath
{{stack_root}}/${hdp.version}/spark2/aux/*
yarn.nodemanager.aux-services.spark_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService
yarn.nodemanager.aux-services.spark_shuffle.classpath
{{stack_root}}/${hdp.version}/spark/aux/*
Minimum Container Size (VCores) 0
Maximum Container Size (VCores) 12
Number of virtual cores 12


Also I followed Dynamic resource allocation

and passed all the steps to configure external shuffle service, I
copied the yarn-shuffle jar:

cp /usr/hdp/2.6.3.0-235/spark/aux/spark-2.2.0.2.6.3.0-235-yarn-shuffle.jar
/usr/hdp/2.6.3.0-235/hadoop-yarn/lib/

I see only 3 cores are allocated to the application (deafult executors
is 2 so I guess its driver+2,
Although many tasks are pending.

If it it relevant, I use Jupyter Notebook and findspark to connect to
the cluster:
import findspark
findspark.init()
spark = SparkSession.builder.appName("internal-external2").getOrCreate()


I would really appreciate any suggestion/help, there is no manual on
that topic I didn't try.
thx a lot,
Anton


Clearing usercache on EMR [pyspark]

2018-08-01 Thread Shuporno Choudhury
Hi everyone,
I am running spark jobs on EMR (using pyspark). I noticed that after
running jobs, the size of the usercache (basically the filecache folder)
keeps on increasing (with directory names as 1,2,3,4,5,...).
Directory location: */mnt/yarn/usercache/hadoop/**filecache/*
Is there a way to avoid creating these directories or automatically
clearing the usercache/filecache after a job/periodically?
-- 
--Thanks,
Shuporno Choudhury