Re: Spark on Kubernetes: Kubernetes killing executors because of overallocation of memory

2018-08-02 Thread Matt Cheah
Hi there,

 

You may want to look at setting the memory overhead settings higher. Spark will 
then start containers with a higher memory limit (spark.executor.memory + 
spark.executor.memoryOverhead, to be exact) while the heap is still locked to 
spark.executor.memory. There’s some memory used by offheap storage from Spark 
that won’t be accounted for in just the heap size.

 

Hope this helps,

 

-Matt Cheah

 

From: Jayesh Lalwani 
Date: Thursday, August 2, 2018 at 12:35 PM
To: "user@spark.apache.org" 
Subject: Spark on Kubernetes: Kubernetes killing executors because of 
overallocation of memory

 

We are running Spark 2.3 on a Kubernetes cluster. We have set the following 
spark configuration options

"spark.executor.memory": "7g",

"spark.driver.memory": "2g",

"spark.memory.fraction": "0.75"

 

WHat we see is

a) In the SPark UI, 5G has been allocated to each executor, which makes sense 
because we set spark.memory.fraction=0.75
b) Kubernetes reports the pod memory usage as 7.6G

 

WHen we run a lot of jobs on the Kubernetes cluster, Kubernetes starts killing 
the executor pods, because it thinks that the pod is misbehaving.

 

We logged into a running pod, and ran the top command, and most of the 7.6G is 
being allocated to the executor's java process

 

Why is Spark taking 7.6G instead of 7 G? Where is the 600MB being allocated to? 
Is there some configuration that controls how much of the executor memory gets 
allocated to Permgen vs the memory that gets allocated to the heap?

 

 

 

The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



smime.p7s
Description: S/MIME cryptographic signature


Insert into dynamic partitioned hive/parquet table throws error - Partition spec contains non-partition columns

2018-08-02 Thread Nirav Patel
I am trying to insert overwrite multiple partitions into existing
partitioned hive/parquet table. Table was created using sparkSession.

I have a table 'mytable' with partitions P1 and P2.

I have following set on sparkSession object:

.config("hive.exec.dynamic.partition", true)

.config("hive.exec.dynamic.partition.mode", "nonstrict")

val df = spark.read.csv(pathToNewData)

df.createOrReplaceTempView("updateTable")

here 'df' may contains data from multiple partitions. i.e. multiple values
for P1 and P2 in data.


spark.sql("insert overwrite table mytable PARTITION(P1, P2) select c1,
c2,..cn, P1, P2 from updateTable") // I made sure that partition columns P1
and P2 are at the end of projection list.

I am getting following error:

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.
metadata.Table.ValidationFailureSemanticException: Partition spec {p1=,
p2=, P1=1085, P2=164590861} contains non-partition columns;

dataframe 'df' have records for P1=1085, P2=164590861 .

-- 


 

 
   
   
      



Re: [External Sender] re: streaming, batch / spark 2.2.1

2018-08-02 Thread Peter Liu
thanks for the clarification.

the processing time on both systems seems to be fine :(a) based on the
pattern of batch processing time chart, i.e the  batch processing time is
not becoming longer and longer (see charts attached below); (b) the input
data on each spark stage of every batch remains the same and constant;

the two systems are indeed different in cpu frequency and cache size. so,
not performing at the same level is ok. But i'd like to know whether the
slower system is processing all incoming messages since its total input
data seen on spark ui (exector tab) is only half as much as the faster one
due to different batch processing time. would this be a functional issue?
Or, the spark stage input data should not be directly interpreted/compared
as/with incoming message data? I seem to miss something here.

Thank you!

Peter


the fast system (batch time in sec; x is the timeline, ignore the number on
x):


the slow system:






On Thu, Aug 2, 2018 at 4:11 PM, Jayesh Lalwani <
jayesh.lalw...@capitalone.com> wrote:

> What is differrent between the 2 systems? If one system processes records
> faster than the other, simply because it does less processing, then you can
> expect the first system to have a higher throughput than the second. It's
> hard to say why one system has double the throughput of another without
> knowing what it is doing internally.
>
> "The number of records in one batch does not seem to be determined by the
> batch interval (since it is zero by default in Spark2.2), but likely (at
> least influenced) by the time it needs to process the previous batch."
> This is expected Spark behavior. If you set batch interval to 0, it will
> process a microbatch immediately after it has finished the previous
> microbatch. Assuming your input is coming at a constant rate of R records
> per second, and one microbatch takes T1 seconds, then the next microbatch
> will take R.T1 records. If the second microbatch takes T2 seconds, then the
> third microbatch will take R.T2 records. This is why it's important than
> your throughput is higher than your input rate. If it's not, batches will
> become bigger and bigger and take longer and longer until the application
> fails
>
>
>
> On Thu, Aug 2, 2018 at 2:43 PM Peter Liu  wrote:
>
>> Hello there,
>>
>> I'm new to spark streaming and have trouble to understand spark batch
>> "composition" (google search keeps give me an older spark streaming
>> concept). Would appreciate any help and clarifications.
>> I'm using spark 2.2.1 for a streaming workload (see quoted code in (a)
>> below). The general question I have is:
>>
>> How is the number of records for a spark batch (as seen on Spark Job UI)
>> determined? (the default batch interval time is supposedly zero in Spark
>> 2.2.1  by default settings)
>>
>> The Issue I'm facing is that for the same incoming streaming source (300K
>> msg/sec to a kafka broker, 220bytes per message), I got different numbers
>> (2x) of processed batches on two different systems for the same amount of
>> application/consumer running time (30min). -- At batch level, the input
>> data size per batch are the same (49.9KB), where the total input data size
>> (under spark executor tab) is different , i.e. ~2x as the system also
>> processed 2x of batches as expected. --- Note: on both systems, the spark
>> consumer seems to hold well (no increased batch processing time lagging
>> over the 30 min). see (c) for the real functional concern.
>>
>> (b) and (c) below have a bit more context info and the real concern in
>> case relevant.
>>
>> Have been struggling with this. Any comments and help would be very much
>> appreciated.
>>
>> Thanks!
>>
>> Regards,
>>
>> Peter
>>
>> =
>> (a) code in use:
>>   .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
>> TIMESTAMP)").as[(String, Timestamp)]
>>   .select(from_json($"value", mySchema).as("data"), $"timestamp")
>>   .select("data.*", "timestamp")
>>   .where($"event_type" === "view")
>>   .select($"ad_id", $"event_time")
>>   .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))
>>   .groupBy(millisTime(window($"event_time", "10
>> seconds").getField("start")) as 'time_window, $"campaign_id")  //original
>> code
>>   .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
>>   .select(to_json(struct("*")) as 'value)
>>   .writeStream
>>   .format("kafka")
>> ...  .
>>outputMode("update")
>>   .start()
>>
>> (b)
>> the number of records in one batch does not seem to be determined by the
>> batch interval (since it is zero by default in Spark2.2), but likely (at
>> least influenced) by the time it needs to process the previous batch. It is
>> noted that the input data amount per batch seems to be quite consistent and
>> kept the same on both systems from Spark UI (49.9 kb)- indicating there is
>> a strict logic to prepare/cap the data per batch despite the fluctuation in
>> the batch processing time - what is 

Re: re: streaming, batch / spark 2.2.1

2018-08-02 Thread zakhavan
Yes, I am loading a text file from my local machine into a kafka topic using
the script below and I'd like to calculate the number of samples per second
which is used by kafka consumer.

if __name__ == "__main__":
print("hello spark")

sc = SparkContext(appName="STALTA")
ssc = StreamingContext(sc, 5)
broker, topic = sys.argv[1:]
# Connect to Kafka

kvs = KafkaUtils.createStream(ssc, broker,
"raw-event-streaming-consumer",{topic:1})
ssc.start()
ssc.awaitTermination()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [External Sender] re: streaming, batch / spark 2.2.1

2018-08-02 Thread Jayesh Lalwani
What is differrent between the 2 systems? If one system processes records
faster than the other, simply because it does less processing, then you can
expect the first system to have a higher throughput than the second. It's
hard to say why one system has double the throughput of another without
knowing what it is doing internally.

"The number of records in one batch does not seem to be determined by the
batch interval (since it is zero by default in Spark2.2), but likely (at
least influenced) by the time it needs to process the previous batch."
This is expected Spark behavior. If you set batch interval to 0, it will
process a microbatch immediately after it has finished the previous
microbatch. Assuming your input is coming at a constant rate of R records
per second, and one microbatch takes T1 seconds, then the next microbatch
will take R.T1 records. If the second microbatch takes T2 seconds, then the
third microbatch will take R.T2 records. This is why it's important than
your throughput is higher than your input rate. If it's not, batches will
become bigger and bigger and take longer and longer until the application
fails



On Thu, Aug 2, 2018 at 2:43 PM Peter Liu  wrote:

> Hello there,
>
> I'm new to spark streaming and have trouble to understand spark batch
> "composition" (google search keeps give me an older spark streaming
> concept). Would appreciate any help and clarifications.
> I'm using spark 2.2.1 for a streaming workload (see quoted code in (a)
> below). The general question I have is:
>
> How is the number of records for a spark batch (as seen on Spark Job UI)
> determined? (the default batch interval time is supposedly zero in Spark
> 2.2.1  by default settings)
>
> The Issue I'm facing is that for the same incoming streaming source (300K
> msg/sec to a kafka broker, 220bytes per message), I got different numbers
> (2x) of processed batches on two different systems for the same amount of
> application/consumer running time (30min). -- At batch level, the input
> data size per batch are the same (49.9KB), where the total input data size
> (under spark executor tab) is different , i.e. ~2x as the system also
> processed 2x of batches as expected. --- Note: on both systems, the spark
> consumer seems to hold well (no increased batch processing time lagging
> over the 30 min). see (c) for the real functional concern.
>
> (b) and (c) below have a bit more context info and the real concern in
> case relevant.
>
> Have been struggling with this. Any comments and help would be very much
> appreciated.
>
> Thanks!
>
> Regards,
>
> Peter
>
> =
> (a) code in use:
>   .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
> TIMESTAMP)").as[(String, Timestamp)]
>   .select(from_json($"value", mySchema).as("data"), $"timestamp")
>   .select("data.*", "timestamp")
>   .where($"event_type" === "view")
>   .select($"ad_id", $"event_time")
>   .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))
>   .groupBy(millisTime(window($"event_time", "10
> seconds").getField("start")) as 'time_window, $"campaign_id")  //original
> code
>   .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
>   .select(to_json(struct("*")) as 'value)
>   .writeStream
>   .format("kafka")
> ...  .
>outputMode("update")
>   .start()
>
> (b)
> the number of records in one batch does not seem to be determined by the
> batch interval (since it is zero by default in Spark2.2), but likely (at
> least influenced) by the time it needs to process the previous batch. It is
> noted that the input data amount per batch seems to be quite consistent and
> kept the same on both systems from Spark UI (49.9 kb)- indicating there is
> a strict logic to prepare/cap the data per batch despite the fluctuation in
> the batch processing time - what is this logic?
>
>
> (c)
> the major question is a functional one: if one system processes the double
> amount of the data than the other, should it be an indication that either
> the system processed duplicated data or the other system processes half of
> the needed data? Or it is more a reporting issue?
>
>
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: re: streaming, batch / spark 2.2.1

2018-08-02 Thread zakhavan
Hello,

I just had a question. Could you refer me to a link or tell me how you
calculated these logs such as: *300K msg/sec to a kafka broker, 220bytes per
message * I'm  load a text file with 36000 records into a kafka topic and
I'd like to calculate the data rate (#samples per sec) in kafka.

Thank you,
Zeinab




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark on Kubernetes: Kubernetes killing executors because of overallocation of memory

2018-08-02 Thread Jayesh Lalwani
We are running Spark 2.3 on a Kubernetes cluster. We have set the following
spark configuration options

"spark.executor.memory": "7g",
"spark.driver.memory": "2g",
"spark.memory.fraction": "0.75"

WHat we see is
a) In the SPark UI, 5G has been allocated to each executor, which makes
sense because we set spark.memory.fraction=0.75
b) Kubernetes reports the pod memory usage as 7.6G

WHen we run a lot of jobs on the Kubernetes cluster, Kubernetes starts
killing the executor pods, because it thinks that the pod is misbehaving.

We logged into a running pod, and ran the top command, and most of the 7.6G
is being allocated to the executor's java process

Why is Spark taking 7.6G instead of 7 G? Where is the 600MB being allocated
to? Is there some configuration that controls how much of the executor
memory gets allocated to Permgen vs the memory that gets allocated to the
heap?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-02 Thread Nirav Patel
I tried following to explicitly specify partition columns in sql statement
and also tried different cases (upper and lower) fro partition columns.

insert overwrite table $tableName PARTITION(P1, P2) select A, B, C, P1, P2
from updateTable.

Still getting:

Caused by:
org.apache.hadoop.hive.ql.metadata.Table$ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition
columns



On Thu, Aug 2, 2018 at 11:37 AM, Nirav Patel  wrote:

> Thanks Koert. I'll check that out when we can update to 2.3
>
> Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
> overwrite multiple partitions. (without loosing existing ones)
>
> It's giving me issues around partition columns.
>
> dataFrame.createOrReplaceTempView("updateTable") //here dataframe
> contains values from multiple partitions.
>
> dataFrame also have partition columns but I can't get any of following to
> execute:
>
> insert overwrite table $tableName PARTITION(P1, P2) select * from
> updateTable.
>
> org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.
> metadata.Table.ValidationFailureSemanticException: Partition spec {p1=,
> p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains non-partition columns;
>
>
> Is above a right approach to update multiple partitions? Or should I be
> more specific updating each partition with separate command like following:
>
> //Pseudo code; yet to try
>
> df.createOrReplaceTempView("updateTable")
> df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>
>
>
>   spark.sql("INSERT OVERWRITE TABLE stats
>   PARTITION(P1 = key._1, P2 = key._2)
>   SELECT * from updateTable where P1 = key._1 and P2 = key._2")
> }
>
> Regards,
> Nirav
>
>
> On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers  wrote:
>
>> this works for dataframes with spark 2.3 by changing a global setting,
>> and will be configurable per write in 2.4
>> see:
>> https://issues.apache.org/jira/browse/SPARK-20236
>> https://issues.apache.org/jira/browse/SPARK-24860
>>
>> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel 
>> wrote:
>>
>>> Hi Peay,
>>>
>>> Have you find better solution yet? I am having same issue.
>>>
>>> Following says it works with spark 2.1 onward but only when you use
>>> sqlContext and not Dataframe
>>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>>> using-spark-2e2b818a007a
>>>
>>> Thanks,
>>> Nirav
>>>
>>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh 
>>> wrote:
>>>
 If your processing task inherently processes input data by month you
 may want to "manually" partition the output data by month as well as
 by day, that is to save it with a file name including the given month,
 i.e. "dataset.parquet/month=01". Then you will be able to use the
 overwrite mode with each month partition. Hope this could be of some
 help.

 --
 Pavel Knoblokh

 On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
 > Hello,
 >
 > I am trying to use
 > data_frame.write.partitionBy("day").save("dataset.parquet") to write
 a
 > dataset while splitting by day.
 >
 > I would like to run a Spark job  to process, e.g., a month:
 > dataset.parquet/day=2017-01-01/...
 > ...
 >
 > and then run another Spark job to add another month using the same
 folder
 > structure, getting me
 > dataset.parquet/day=2017-01-01/
 > ...
 > dataset.parquet/day=2017-02-01/
 > ...
 >
 > However:
 > - with save mode "overwrite", when I process the second month, all of
 > dataset.parquet/ gets removed and I lose whatever was already
 computed for
 > the previous month.
 > - with save mode "append", then I can't get idempotence: if I run the
 job to
 > process a given month twice, I'll get duplicate data in all the
 subfolders
 > for that month.
 >
 > Is there a way to do "append in terms of the subfolders from
 partitionBy,
 > but overwrite within each such partitions? Any help would be
 appreciated.
 >
 > Thanks!



 --
 Pavel Knoblokh

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>>
>>> [image: What's New with Xactly] 
>>>
>>> 
>>> 
>>>    
>>> 
>>
>>
>>
>

-- 


 

 
   
   
      



re: streaming, batch / spark 2.2.1

2018-08-02 Thread Peter Liu
 Hello there,

I'm new to spark streaming and have trouble to understand spark batch
"composition" (google search keeps give me an older spark streaming
concept). Would appreciate any help and clarifications.
I'm using spark 2.2.1 for a streaming workload (see quoted code in (a)
below). The general question I have is:

How is the number of records for a spark batch (as seen on Spark Job UI)
determined? (the default batch interval time is supposedly zero in Spark
2.2.1  by default settings)

The Issue I'm facing is that for the same incoming streaming source (300K
msg/sec to a kafka broker, 220bytes per message), I got different numbers
(2x) of processed batches on two different systems for the same amount of
application/consumer running time (30min). -- At batch level, the input
data size per batch are the same (49.9KB), where the total input data size
(under spark executor tab) is different , i.e. ~2x as the system also
processed 2x of batches as expected. --- Note: on both systems, the spark
consumer seems to hold well (no increased batch processing time lagging
over the 30 min). see (c) for the real functional concern.

(b) and (c) below have a bit more context info and the real concern in case
relevant.

Have been struggling with this. Any comments and help would be very much
appreciated.

Thanks!

Regards,

Peter

=
(a) code in use:
  .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
TIMESTAMP)").as[(String, Timestamp)]
  .select(from_json($"value", mySchema).as("data"), $"timestamp")
  .select("data.*", "timestamp")
  .where($"event_type" === "view")
  .select($"ad_id", $"event_time")
  .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))
  .groupBy(millisTime(window($"event_time", "10
seconds").getField("start")) as 'time_window, $"campaign_id")  //original
code
  .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
  .select(to_json(struct("*")) as 'value)
  .writeStream
  .format("kafka")
...  .
   outputMode("update")
  .start()

(b)
the number of records in one batch does not seem to be determined by the
batch interval (since it is zero by default in Spark2.2), but likely (at
least influenced) by the time it needs to process the previous batch. It is
noted that the input data amount per batch seems to be quite consistent and
kept the same on both systems from Spark UI (49.9 kb)- indicating there is
a strict logic to prepare/cap the data per batch despite the fluctuation in
the batch processing time - what is this logic?


(c)
the major question is a functional one: if one system processes the double
amount of the data than the other, should it be an indication that either
the system processed duplicated data or the other system processes half of
the needed data? Or it is more a reporting issue?


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-02 Thread Nirav Patel
Thanks Koert. I'll check that out when we can update to 2.3

Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
overwrite multiple partitions. (without loosing existing ones)

It's giving me issues around partition columns.

dataFrame.createOrReplaceTempView("updateTable") //here dataframe
contains values from multiple partitions.

dataFrame also have partition columns but I can't get any of following to
execute:

insert overwrite table $tableName PARTITION(P1, P2) select * from
updateTable.

org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains
non-partition columns;


Is above a right approach to update multiple partitions? Or should I be
more specific updating each partition with separate command like following:

//Pseudo code; yet to try

df.createOrReplaceTempView("updateTable")
df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>


  spark.sql("INSERT OVERWRITE TABLE stats
  PARTITION(P1 = key._1, P2 = key._2)
  SELECT * from updateTable where P1 = key._1 and P2 = key._2")
}

Regards,
Nirav


On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers  wrote:

> this works for dataframes with spark 2.3 by changing a global setting, and
> will be configurable per write in 2.4
> see:
> https://issues.apache.org/jira/browse/SPARK-20236
> https://issues.apache.org/jira/browse/SPARK-24860
>
> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel  wrote:
>
>> Hi Peay,
>>
>> Have you find better solution yet? I am having same issue.
>>
>> Following says it works with spark 2.1 onward but only when you use
>> sqlContext and not Dataframe
>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>> using-spark-2e2b818a007a
>>
>> Thanks,
>> Nirav
>>
>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh 
>> wrote:
>>
>>> If your processing task inherently processes input data by month you
>>> may want to "manually" partition the output data by month as well as
>>> by day, that is to save it with a file name including the given month,
>>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>>> overwrite mode with each month partition. Hope this could be of some
>>> help.
>>>
>>> --
>>> Pavel Knoblokh
>>>
>>> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
>>> > Hello,
>>> >
>>> > I am trying to use
>>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
>>> > dataset while splitting by day.
>>> >
>>> > I would like to run a Spark job  to process, e.g., a month:
>>> > dataset.parquet/day=2017-01-01/...
>>> > ...
>>> >
>>> > and then run another Spark job to add another month using the same
>>> folder
>>> > structure, getting me
>>> > dataset.parquet/day=2017-01-01/
>>> > ...
>>> > dataset.parquet/day=2017-02-01/
>>> > ...
>>> >
>>> > However:
>>> > - with save mode "overwrite", when I process the second month, all of
>>> > dataset.parquet/ gets removed and I lose whatever was already computed
>>> for
>>> > the previous month.
>>> > - with save mode "append", then I can't get idempotence: if I run the
>>> job to
>>> > process a given month twice, I'll get duplicate data in all the
>>> subfolders
>>> > for that month.
>>> >
>>> > Is there a way to do "append in terms of the subfolders from
>>> partitionBy,
>>> > but overwrite within each such partitions? Any help would be
>>> appreciated.
>>> >
>>> > Thanks!
>>>
>>>
>>>
>>> --
>>> Pavel Knoblokh
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>> 
>> 
>>    
>> 
>
>
>

-- 


 

 
   
   
      



Re: Can we deploy python script on a spark cluster

2018-08-02 Thread amit kumar singh
Hi Lehak

You can make a scala project with oozing class 
And one run class which will ship your python file to cluster 

Define oozie  coordinator with  spark action or shell action

We are deploying pyspark  based machine learning code 


Sent from my iPhone

> On Aug 2, 2018, at 8:46 AM, Lehak Dharmani  
> wrote:
> 
> 
> We are trying to deploy python script on spark cluster . However as per
> documentations , it is not possible to deploy python applications on a
> cluster . Is there any alternative 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Can we deploy python script on a spark cluster

2018-08-02 Thread Lehak Dharmani


We are trying to deploy python script on spark cluster . However as per
documentations , it is not possible to deploy python applications on a
cluster . Is there any alternative 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2018-08-02 Thread Eco Super
Hi User,
unsubscribe me