Re: Error - Dropping SparkListenerEvent because no remaining room in event queue

2018-10-24 Thread Arun Mahadevan
Maybe you have spark listeners that are not processing the events fast
enough?
Do you have spark event logging enabled?
You might have to profile the built in and your custom listeners to see
whats going on.

- Arun

On Wed, 24 Oct 2018 at 16:08, karan alang  wrote:

>
> Pls note - Spark version is 2.2.0
>
> On Wed, Oct 24, 2018 at 3:57 PM karan alang  wrote:
>
>> Hello -
>> we are running a Spark job, and getting the following error -
>>
>> "LiveListenerBus: Dropping SparkListenerEvent because no remaining room
>> in event queue"
>>
>> As per the recommendation in the Spark Docs -
>>
>> I've increased the value of property
>> spark.scheduler.listenerbus.eventqueue.capacity to 9 (from the
>> default 1)
>> and also increased the Diver memory
>>
>> That seems to have mitigated the issue.
>>
>> The question is - is there is any Code optimization (or any other) that
>> can be done to resolve this problem ?
>> Pls note - we are primarily using functions like - reduce(),
>> collectAsList() and persist() as part of the job.
>>
>


Does Spark have a plan to move away from sun.misc.Unsafe?

2018-10-24 Thread kant kodali
Hi All,

Does Spark have a plan to move away from sun.misc.Unsafe to VarHandles
? I am trying to find a JIRA issue for
this?

Thanks!


Re: Error - Dropping SparkListenerEvent because no remaining room in event queue

2018-10-24 Thread karan alang
Pls note - Spark version is 2.2.0

On Wed, Oct 24, 2018 at 3:57 PM karan alang  wrote:

> Hello -
> we are running a Spark job, and getting the following error -
>
> "LiveListenerBus: Dropping SparkListenerEvent because no remaining room in
> event queue"
>
> As per the recommendation in the Spark Docs -
>
> I've increased the value of property
> spark.scheduler.listenerbus.eventqueue.capacity to 9 (from the default
> 1)
> and also increased the Diver memory
>
> That seems to have mitigated the issue.
>
> The question is - is there is any Code optimization (or any other) that
> can be done to resolve this problem ?
> Pls note - we are primarily using functions like - reduce(),
> collectAsList() and persist() as part of the job.
>


Re: [Spark UI] Spark 2.3.1 UI no longer respects spark.ui.retainedJobs

2018-10-24 Thread Marcelo Vanzin
When you say many jobs at once, what ballpark are you talking about?

The code in 2.3+ does try to keep data about all running jobs and
stages regardless of the limit. If you're running into issues because
of that we may have to look again at whether that's the right thing to
do.
On Tue, Oct 23, 2018 at 10:02 AM Patrick Brown
 wrote:
>
> I believe I may be able to reproduce this now, it seems like it may be 
> something to do with many jobs at once:
>
> Spark 2.3.1
>
> > spark-shell --conf spark.ui.retainedJobs=1
>
> scala> import scala.concurrent._
> scala> import scala.concurrent.ExecutionContext.Implicits.global
> scala> for (i <- 0 until 5) { Future { println(sc.parallelize(0 until 
> i).collect.length) } }
>
> On Mon, Oct 22, 2018 at 11:25 AM Marcelo Vanzin  wrote:
>>
>> Just tried on 2.3.2 and worked fine for me. UI had a single job and a
>> single stage (+ the tasks related to that single stage), same thing in
>> memory (checked with jvisualvm).
>>
>> On Sat, Oct 20, 2018 at 6:45 PM Marcelo Vanzin  wrote:
>> >
>> > On Tue, Oct 16, 2018 at 9:34 AM Patrick Brown
>> >  wrote:
>> > > I recently upgraded to spark 2.3.1 I have had these same settings in my 
>> > > spark submit script, which worked on 2.0.2, and according to the 
>> > > documentation appear to not have changed:
>> > >
>> > > spark.ui.retainedTasks=1
>> > > spark.ui.retainedStages=1
>> > > spark.ui.retainedJobs=1
>> >
>> > I tried that locally on the current master and it seems to be working.
>> > I don't have 2.3 easily in front of me right now, but will take a look
>> > Monday.
>> >
>> > --
>> > Marcelo
>>
>>
>>
>> --
>> Marcelo



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Error - Dropping SparkListenerEvent because no remaining room in event queue

2018-10-24 Thread karan alang
Hello -
we are running a Spark job, and getting the following error -

"LiveListenerBus: Dropping SparkListenerEvent because no remaining room in
event queue"

As per the recommendation in the Spark Docs -

I've increased the value of property
spark.scheduler.listenerbus.eventqueue.capacity to 9 (from the default
1)
and also increased the Diver memory

That seems to have mitigated the issue.

The question is - is there is any Code optimization (or any other) that can
be done to resolve this problem ?
Pls note - we are primarily using functions like - reduce(),
collectAsList() and persist() as part of the job.


Re: Spark 2.3.2 : No of active tasks vastly exceeds total no of executor cores

2018-10-24 Thread Shing Hing Man
 I have increased spark.scheduler.listenerbus.eventqueue.capacity, and ran my 
application (in Yarn client mode) as before. I no longer get "Dropped events". 
But the driver ran out of memory. The Spark UI gradually became unreponsive. I 
noticed from the Spark UI 
that tens of thousands of jobs were kept. In my application. many Spark jobs 
are submitted concurrently.  I think I am seeing  the problem mentioned in the 
following recent post.

Apache Spark User List - [Spark UI] Spark 2.3.1 UI no longer respects 
spark.ui.retainedJobs

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Apache Spark User List - [Spark UI] Spark 2.3.1 UI no longer respects sp...

[Spark UI] Spark 2.3.1 UI no longer respects spark.ui.retainedJobs. I recently 
upgraded to spark 2.3.1 I have ha...
 |

 |

 |




Shing

On Monday, 22 October 2018, 19:56:32 GMT+1, Shing Hing Man 
 wrote:  
 
  In my log, I have found
mylog.2:2018-10-19 20:00:50,455 WARN [dag-scheduler-event-loop] 
(Logging.scala:66) - Dropped 3498 events from appStatus since Fri Oct 19 
19:25:05 UTC 2018.mylog.2:2018-10-19 20:02:07,053 WARN 
[dispatcher-event-loop-1] (Logging.scala:66) - Dropped 123385 events from 
appStatus since Fri Oct 19 20:00:50 UTC 2018.mylog.3:2018-10-19 19:23:42,922 
ERROR [dispatcher-event-loop-3] (Logging.scala:70) - Dropping event from queue 
appStatus. This likely means one of the listeners is too slow and cannot keep 
up with the rate at which tasks are being started by the 
scheduler.mylog.3:2018-10-19 19:23:42,928 WARN [dag-scheduler-event-loop] 
(Logging.scala:66) - Dropped 2 events from appStatus since Thu Jan 01 00:00:00 
UTC 1970.mylog.3:2018-10-19 19:25:05,822 WARN [dag-scheduler-event-loop] 
(Logging.scala:66) - Dropped 12190 events from appStatus since Fri Oct 19 
19:23:42 UTC 2018.

I will try increasing   spark.scheduler.listenerbus.eventqueue.capacity ,
Shing
On Monday, 22 October 2018, 01:46:11 BST, Mark Hamstra 
 wrote:  
 
 Look for these log messages:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala#L154
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala#L172


On Fri, Oct 19, 2018 at 4:42 PM Shing Hing Man  wrote:

Hi,  I have just upgraded my application to Spark 2.3.2 from 2.2.1. When I run 
my Spark application in Yarn, in the executor tab of Spark UI, I see there are 
1499 active tasks.There is only 145 cores in my executors. I have not changed 
any of spark.ui.* parameters.
In Spark 2.2.1, the number of active tasks never exceeds 145 cores, the total 
no of cpu cores of  all the executors. 

Also my application takes 4 times longer to run with Spark 2.3.2 than with 
Spark 2.2.1. 

I wonder if my application is slow down because of too many active tasks. 





Thanks in advance for any assistance !
Shing 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Watermarking without aggregation with Structured Streaming

2018-10-24 Thread Sanjay Awatramani
Try if this works...
println(query.lastProgress.eventTime.get("watermark"))

Regards,Sanjay
On 2018/09/30 09:05:40, peay  wrote: 
> Thanks for the pointers. I guess right now the only workaround would be to 
> apply a "dummy" aggregation (e.g., group by the timestamp itself) only to 
> have the stateful processing logic kick in and apply the filtering?>

>

> For my purposes, an alternative solution to pushing it out to the source 
> would be to make the watermark timestamp available through a function so that 
> it can be used in a regular filter clause. Based on my experiments, the 
> timestamp is computed and updated even when no stateful computations occur. I 
> am not sure how easy that would be to contribute though, maybe someone can 
> suggest a starting point?> 

>

> Thanks,> 

>

> ‐‐‐ Original Message ‐‐‐> 

> On Sunday, 30 September 2018 10:41, Jungtaek Lim  wrote:> 

>

> > The purpose of watermark is to set a limitation on handling records due to 
> > state going infinity. In other cases (non-stateful operations), it is 
> > pretty normal to handle all of records even they're pretty late.> 

> >> 

> > Btw, there was some comments regarding this: while Spark delegates to 
> > filter out late records in stateful operations for now, some of us 
> > (including me) think filtering out late records in earlier phase (source, 
> > or just after source) makes more sense. It just didn't come out as action, 
> > but I think it is still valid.> 

> >> 

> > https://github.com/apache/spark/pull/21617#issuecomment-400119049>

> >> 

> > If we move the phase of filtering out late records, what you would like to 
> > do may become the default behavior. This also means the output may be also 
> > changed for queries which use non-stateful operations, so it is not a 
> > trivial change and may require consensus like SPIP process.> 

> >> 

> > Thanks,> 

> > Jungtaek Lim (HeartSaVioR)> 

> >> 

> > 2018년9월30일(일) 오후5:19, chandan prakash 님이작성:> 

> >> 

> >> Interesting question.> 

> >> I do not think without any aggregation operation/groupBy , watermark is 
> >> supported currently .> 

> >>> 

> >> Reason:> 

> >> Watermark in Structured Streaming is used for limiting the size of state 
> >> needed to keep intermediate information in-memory.> 

> >> And state only comes in picture in case of stateful processing.> 

> >> Also in the code, it seems that  filtering out records on basis of 
> >> watermark happen only in case of stateful operators 
> >> (statefulOperators.scala)> 

> >> Have not tried running code though and would like to know if someone can 
> >> shed more light on this.> 

> >>> 

> >> Regards,> 

> >> Chandan> 

> >>> 

> >> On Sat, Sep 22, 2018 at 7:43 PM peay  
> >> wrote:> 

> >>> 

> >>> Hello,> 

>  

> >>> I am trying to use watermarking without aggregation, to filter out 
> >>> records that are just too late, instead of appending them to the output. 
> >>> My understanding is that aggregation is required for `withWatermark` to 
> >>> have any effect. Is that correct?> 

>  

> >>> I am looking for something along the lines of> 

>  

> >>> ```> 

> >>> df.withWatermark("ts", ...).filter(F.col("ts")  

> >>> ```> 

>  

> >>> Is there any way to get the watermark value to achieve that?> 

>  

> >>> Thanks!> 

> >>> 

> >> --> 

> >> Chandan Prakash> 


Re: Watermarking without aggregation with Structured Streaming

2018-10-24 Thread sanjay_awat
Try this


peay-2 wrote
> For my purposes, an alternative solution to pushing it out to the source
> would be to make the watermark timestamp available through a function so
> that it can be used in a regular filter clause. Based on my experiments,
> the timestamp is computed and updated even when no stateful computations
> occur. I am not sure how easy that would be to contribute though, maybe
> someone can suggest a starting point?





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



CVE-2018-11804: Apache Spark build/mvn runs zinc, and can expose information from build machines

2018-10-24 Thread Sean Owen
Severity: Low

Vendor: The Apache Software Foundation

Versions Affected:
1.3.x release branch and later, including master

Description:
Spark's Apache Maven-based build includes a convenience script, 'build/mvn',
that downloads and runs a zinc server to speed up compilation. This server
will accept connections from external hosts by default. A specially-crafted
request to the zinc server could cause it to reveal information in files
readable to the developer account running the build. Note that this issue
does not affect end users of Spark, only developers building Spark from
source code.

Mitigation:
Spark users are not affected, as zinc is only a part of the build process.
Spark developers may simply use a local Maven installation's 'mvn' command
to build, and avoid running build/mvn and zinc.
Spark developers building actively-developed branches (2.2.x, 2.3.x, 2.4.x,
master) may update their branches to receive mitigations already patched
onto the build/mvn script.
Spark developers running zinc separately may include "-server 127.0.0.1" in
its command line, and consider additional flags like "-idle-timeout 30m" to
achieve similar mitigation.

Credit:
Andre Protas, Apple Information Security

References:
https://spark.apache.org/security.html

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark for kubernetes] Azure Blob Storage credentials issue

2018-10-24 Thread Matt Cheah
Hi there,

 

Can you check if HADOOP_CONF_DIR is being set on the executors to 
/opt/spark/conf? One should set an executor environment variable for that.

 

A kubectl describe pod output for the executors would be helpful here.

 

-Matt Cheah

 

From: Oscar Bonilla 
Date: Friday, October 19, 2018 at 1:03 AM
To: "user@spark.apache.org" 
Subject: [Spark for kubernetes] Azure Blob Storage credentials issue

 

Hello,

I'm having the following issue while trying to run Spark for kubernetes 
[spark.apache.org]:
2018-10-18 08:48:54 INFO  DAGScheduler:54 - Job 0 failed: reduce at 
SparkPi.scala:38, took 1.743177 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost 
task 0.3 in stage 0.0 (TID 6, 10.244.1.11, executor 2): 
org.apache.hadoop.fs.azure.AzureException: 
org.apache.hadoop.fs.azure.AzureException: No credentials found for account 
datasets83d858296fd0c49b.blob.core.windows.net 
[datasets83d858296fd0c49b.blob.core.windows.net] in the configuration, and its 
container datasets is not accessible using anonymous credentials. Please check 
if the container exists first. If it is not publicly available, you have to 
provide account credentials.
    at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1086)
    at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:538)
    at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1366)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3242)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3291)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3259)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:470)
    at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1897)
    at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:694)
    at org.apache.spark.util.Utils$.fetchFile(Utils.scala:476)
    at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:755)
    at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:747)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.executor.Executor.org 
[org.apache.spark.executor.executor.org]$apache$spark$executor$Executor$$updateDependencies(Executor.scala:747)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: No credentials found for 
account datasets83d858296fd0c49b.blob.core.windows.net 
[datasets83d858296fd0c49b.blob.core.windows.net] in the configuration, and its 
container datasets is not accessible using anonymous credentials. Please check 
if the container exists first. If it is not publicly available, you have to 
provide account credentials.
    at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:863)
    at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1081)
    ... 24 more
The command I use to launch the job is:
/opt/spark/bin/spark-submit
    --master k8s://
    --deploy-mode cluster
    --name spark-pi
    --class org.apache.spark.examples.SparkPi
    --conf spark.executor.instances=5
    --conf spark.kubernetes.container.image=
    --conf spark.kubernetes.namespace=
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
    --conf spark.kubernetes.driver.secrets.spark=/opt/spark/conf
    --conf spark.kubernetes.executor.secrets.spark=/opt/spark/conf
wasb://@.blob.core.windows.net/spark-examples_2.11-2.3.2.jar
 [blob.core.windows.net] 1
I have a k8s secret named spark with the following content:
apiVersion: v1
kind: Secret
metadata:
  name: spark
  labels:
    app: spark
    stack: service
type: Opaque
data:
  core-site.xml: |-
    {% filter b64encode %}
    
    
  

Re: Triggering sql on Was S3 via Apache Spark

2018-10-24 Thread Gourav Sengupta
I do not think security and governance has become important it always was.
Horton works and Cloudera has fantastic security implementations and hence
I mentioned about updates via Hive.

Regards,
Gourav

On Wed, 24 Oct 2018, 17:32 ,  wrote:

> Thank you Gourav,
>
> Today I saw the article:
> https://databricks.com/session/apache-spark-in-cloud-and-hybrid-why-security-and-governance-become-more-important
>
> It seems also interesting.
>
> I was in meeting, I will also watch it.
>
>
>
> *From: *Gourav Sengupta 
> *Date: *24 October 2018 Wednesday 13:39
> *To: *"Ozsakarya, Omer" 
> *Cc: *Spark Forum 
> *Subject: *Re: Triggering sql on Was S3 via Apache Spark
>
>
>
> Also try to read about SCD and the fact that Hive may be a very good
> alternative as well for running updates on data
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Wed, 24 Oct 2018, 14:53 ,  wrote:
>
> Thank you very much 
>
>
>
> *From: *Gourav Sengupta 
> *Date: *24 October 2018 Wednesday 11:20
> *To: *"Ozsakarya, Omer" 
> *Cc: *Spark Forum 
> *Subject: *Re: Triggering sql on Was S3 via Apache Spark
>
>
>
> This is interesting you asked and then answered the questions (almost) as
> well
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Tue, 23 Oct 2018, 13:23 ,  wrote:
>
> Hi guys,
>
>
>
> We are using Apache Spark on a local machine.
>
>
>
> I need to implement the scenario below.
>
>
>
> In the initial load:
>
>1. CRM application will send a file to a folder. This file contains
>customer information of all customers. This file is in a folder in the
>local server. File name is: customer.tsv
>
>
>1. Customer.tsv contains customerid, country, birty_month,
>   activation_date etc
>
>
>1. I need to read the contents of customer.tsv.
>2. I will add current timestamp info to the file.
>3. I will transfer customer.tsv to the S3 bucket: customer.history.data
>
>
>
> In the daily loads:
>
>1.  CRM application will send a new file which contains the
>updated/deleted/inserted customer information.
>
>   File name is daily_customer.tsv
>
>1. Daily_customer.tsv contains contains customerid, cdc_field,
>   country, birty_month, activation_date etc
>
> Cdc field can be New-Customer, Customer-is-Updated, Customer-is-Deleted.
>
>1. I need to read the contents of daily_customer.tsv.
>2. I will add current timestamp info to the file.
>3. I will transfer daily_customer.tsv to the S3 bucket:
>customer.daily.data
>4. I need to merge two buckets customer.history.data and
>customer.daily.data.
>
>
>1. Two buckets have timestamp fields. So I need to query all records
>   whose timestamp is the last timestamp.
>   2. I can use row_number() over(partition by customer_id order by
>   timestamp_field desc) as version_number
>   3. Then I can put the records whose version is one, to the final
>   bucket: customer.dimension.data
>
>
>
> I am running Spark on premise.
>
>- Can I query on AWS S3 buckets by using Spark Sql / Dataframe or RDD
>on a local Spark cluster?
>- Is this approach efficient? Will the queries transfer all historical
>data from AWS S3 to the local cluster?
>- How can I implement this scenario in a more effective way? Like just
>transferring daily data to AWS S3 and then running queries on AWS.
>
>
>- For instance Athena can query on AWS. But it is just a query engine.
>   As I know I can not call it by using an sdk and I can not write the 
> results
>   to a bucket/folder.
>
>
>
> Thanks in advance,
>
> Ömer
>
>
>
>
>
>
>
>
>
>


Re: Triggering sql on Was S3 via Apache Spark

2018-10-24 Thread Omer.Ozsakarya
Thank you Gourav,

Today I saw the article: 
https://databricks.com/session/apache-spark-in-cloud-and-hybrid-why-security-and-governance-become-more-important
It seems also interesting.
I was in meeting, I will also watch it.

From: Gourav Sengupta 
Date: 24 October 2018 Wednesday 13:39
To: "Ozsakarya, Omer" 
Cc: Spark Forum 
Subject: Re: Triggering sql on Was S3 via Apache Spark

Also try to read about SCD and the fact that Hive may be a very good 
alternative as well for running updates on data

Regards,
Gourav

On Wed, 24 Oct 2018, 14:53 , 
mailto:omer.ozsaka...@sony.com>> wrote:
Thank you very much 

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: 24 October 2018 Wednesday 11:20
To: "Ozsakarya, Omer" mailto:omer.ozsaka...@sony.com>>
Cc: Spark Forum mailto:user@spark.apache.org>>
Subject: Re: Triggering sql on Was S3 via Apache Spark

This is interesting you asked and then answered the questions (almost) as well

Regards,
Gourav

On Tue, 23 Oct 2018, 13:23 , 
mailto:omer.ozsaka...@sony.com>> wrote:
Hi guys,

We are using Apache Spark on a local machine.

I need to implement the scenario below.

In the initial load:

  1.  CRM application will send a file to a folder. This file contains customer 
information of all customers. This file is in a folder in the local server. 
File name is: customer.tsv

 *   Customer.tsv contains customerid, country, birty_month, 
activation_date etc

  1.  I need to read the contents of customer.tsv.
  2.  I will add current timestamp info to the file.
  3.  I will transfer customer.tsv to the S3 bucket: customer.history.data

In the daily loads:

  1.   CRM application will send a new file which contains the 
updated/deleted/inserted customer information.

  File name is daily_customer.tsv

 *   Daily_customer.tsv contains contains customerid, cdc_field, country, 
birty_month, activation_date etc

Cdc field can be New-Customer, Customer-is-Updated, Customer-is-Deleted.

  1.  I need to read the contents of daily_customer.tsv.
  2.  I will add current timestamp info to the file.
  3.  I will transfer daily_customer.tsv to the S3 bucket: customer.daily.data
  4.  I need to merge two buckets customer.history.data and customer.daily.data.

 *   Two buckets have timestamp fields. So I need to query all records 
whose timestamp is the last timestamp.
 *   I can use row_number() over(partition by customer_id order by 
timestamp_field desc) as version_number
 *   Then I can put the records whose version is one, to the final bucket: 
customer.dimension.data

I am running Spark on premise.

  *   Can I query on AWS S3 buckets by using Spark Sql / Dataframe or RDD on a 
local Spark cluster?
  *   Is this approach efficient? Will the queries transfer all historical data 
from AWS S3 to the local cluster?
  *   How can I implement this scenario in a more effective way? Like just 
transferring daily data to AWS S3 and then running queries on AWS.

 *   For instance Athena can query on AWS. But it is just a query engine. 
As I know I can not call it by using an sdk and I can not write the results to 
a bucket/folder.

Thanks in advance,
Ömer






Re: Triggering sql on Was S3 via Apache Spark

2018-10-24 Thread Gourav Sengupta
Also try to read about SCD and the fact that Hive may be a very good
alternative as well for running updates on data

Regards,
Gourav

On Wed, 24 Oct 2018, 14:53 ,  wrote:

> Thank you very much 
>
>
>
> *From: *Gourav Sengupta 
> *Date: *24 October 2018 Wednesday 11:20
> *To: *"Ozsakarya, Omer" 
> *Cc: *Spark Forum 
> *Subject: *Re: Triggering sql on Was S3 via Apache Spark
>
>
>
> This is interesting you asked and then answered the questions (almost) as
> well
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Tue, 23 Oct 2018, 13:23 ,  wrote:
>
> Hi guys,
>
>
>
> We are using Apache Spark on a local machine.
>
>
>
> I need to implement the scenario below.
>
>
>
> In the initial load:
>
>1. CRM application will send a file to a folder. This file contains
>customer information of all customers. This file is in a folder in the
>local server. File name is: customer.tsv
>
>
>1. Customer.tsv contains customerid, country, birty_month,
>   activation_date etc
>
>
>1. I need to read the contents of customer.tsv.
>2. I will add current timestamp info to the file.
>3. I will transfer customer.tsv to the S3 bucket: customer.history.data
>
>
>
> In the daily loads:
>
>1.  CRM application will send a new file which contains the
>updated/deleted/inserted customer information.
>
>   File name is daily_customer.tsv
>
>1. Daily_customer.tsv contains contains customerid, cdc_field,
>   country, birty_month, activation_date etc
>
> Cdc field can be New-Customer, Customer-is-Updated, Customer-is-Deleted.
>
>1. I need to read the contents of daily_customer.tsv.
>2. I will add current timestamp info to the file.
>3. I will transfer daily_customer.tsv to the S3 bucket:
>customer.daily.data
>4. I need to merge two buckets customer.history.data and
>customer.daily.data.
>
>
>1. Two buckets have timestamp fields. So I need to query all records
>   whose timestamp is the last timestamp.
>   2. I can use row_number() over(partition by customer_id order by
>   timestamp_field desc) as version_number
>   3. Then I can put the records whose version is one, to the final
>   bucket: customer.dimension.data
>
>
>
> I am running Spark on premise.
>
>- Can I query on AWS S3 buckets by using Spark Sql / Dataframe or RDD
>on a local Spark cluster?
>- Is this approach efficient? Will the queries transfer all historical
>data from AWS S3 to the local cluster?
>- How can I implement this scenario in a more effective way? Like just
>transferring daily data to AWS S3 and then running queries on AWS.
>
>
>- For instance Athena can query on AWS. But it is just a query engine.
>   As I know I can not call it by using an sdk and I can not write the 
> results
>   to a bucket/folder.
>
>
>
> Thanks in advance,
>
> Ömer
>
>
>
>
>
>
>
>
>
>


Re: Triggering sql on Was S3 via Apache Spark

2018-10-24 Thread Omer.Ozsakarya
Thank you very much 

From: Gourav Sengupta 
Date: 24 October 2018 Wednesday 11:20
To: "Ozsakarya, Omer" 
Cc: Spark Forum 
Subject: Re: Triggering sql on Was S3 via Apache Spark

This is interesting you asked and then answered the questions (almost) as well

Regards,
Gourav

On Tue, 23 Oct 2018, 13:23 , 
mailto:omer.ozsaka...@sony.com>> wrote:
Hi guys,

We are using Apache Spark on a local machine.

I need to implement the scenario below.

In the initial load:

  1.  CRM application will send a file to a folder. This file contains customer 
information of all customers. This file is in a folder in the local server. 
File name is: customer.tsv

 *   Customer.tsv contains customerid, country, birty_month, 
activation_date etc

  1.  I need to read the contents of customer.tsv.
  2.  I will add current timestamp info to the file.
  3.  I will transfer customer.tsv to the S3 bucket: customer.history.data

In the daily loads:

  1.   CRM application will send a new file which contains the 
updated/deleted/inserted customer information.

  File name is daily_customer.tsv

 *   Daily_customer.tsv contains contains customerid, cdc_field, country, 
birty_month, activation_date etc

Cdc field can be New-Customer, Customer-is-Updated, Customer-is-Deleted.

  1.  I need to read the contents of daily_customer.tsv.
  2.  I will add current timestamp info to the file.
  3.  I will transfer daily_customer.tsv to the S3 bucket: customer.daily.data
  4.  I need to merge two buckets customer.history.data and customer.daily.data.

 *   Two buckets have timestamp fields. So I need to query all records 
whose timestamp is the last timestamp.
 *   I can use row_number() over(partition by customer_id order by 
timestamp_field desc) as version_number
 *   Then I can put the records whose version is one, to the final bucket: 
customer.dimension.data

I am running Spark on premise.

  *   Can I query on AWS S3 buckets by using Spark Sql / Dataframe or RDD on a 
local Spark cluster?
  *   Is this approach efficient? Will the queries transfer all historical data 
from AWS S3 to the local cluster?
  *   How can I implement this scenario in a more effective way? Like just 
transferring daily data to AWS S3 and then running queries on AWS.

 *   For instance Athena can query on AWS. But it is just a query engine. 
As I know I can not call it by using an sdk and I can not write the results to 
a bucket/folder.

Thanks in advance,
Ömer






How to write DataFrame to single parquet file instead of multiple files under a folder in spark?

2018-10-24 Thread mithril
For better viewing,  please see
https://stackoverflow.com/questions/52964167/how-to-write-dataframe-to-single-parquet-file-instead-of-multiple-files-under-a

-

I have a folder with files 

[![enter image description here][1]][1]


I want to do some transform to each file and save to another folder with
same name(Because original data is alread splited by month, I would need
split again if merge them at first).

## My code(pyspark)

URI   = sc._gateway.jvm.java.net.URI
Path  = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem= sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://:9000"), Configuration())

status = fs.listStatus(Path('/arch/M/stockquantitylogs/2018/'))

paths = list()
for fileStatus in status:
p = str(fileStatus.getPath())
if p.endswith('.parquet'):
paths.append(p)


sqlContext = SQLContext(sc)

def transform(path):
columns = ["store_id", "product_id", "store_product_quantity_old",
"store_product_quantity_new",  "time_create"]
df = sqlContext.read.parquet(path).select(columns)
df1 = df[((df.store_product_quantity_old>0) &
(df.store_product_quantity_new==0))|(df.store_product_quantity_old==0) &
(df.store_product_quantity_new>0)]
df1.coalesce(1).write.parquet(path.replace('/arch', '/clean'),
mode="overwrite")
   

for p in paths:
transform(p)

At first `write.parquet` would write multiple files in one folder,  after
some searching, I add `.coalesce(1)` . But still output folder with one file
under it.
Output:

[![enter image description here][2]][2]

I have tried `df1.write.save(path.replace('/arch', '/clean'),
mode="overwrite")` and  `df1.write.save('/clean/M/stockquantitylogs/2018/',
mode="overwrite")` , both not work.

I just want to process `/arch/M/stockquantitylogs/2018/*.parquet` to
`/clean/M/stockquantitylogs/2018/*.parquet`, each file with same name. 



  [1]: https://i.stack.imgur.com/PqgP6.png
  [2]: https://i.stack.imgur.com/JPcsP.png



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



1

2018-10-24 Thread twinmegami
1


Re: Triggering sql on Was S3 via Apache Spark

2018-10-24 Thread Gourav Sengupta
This is interesting you asked and then answered the questions (almost) as
well

Regards,
Gourav

On Tue, 23 Oct 2018, 13:23 ,  wrote:

> Hi guys,
>
>
>
> We are using Apache Spark on a local machine.
>
>
>
> I need to implement the scenario below.
>
>
>
> In the initial load:
>
>1. CRM application will send a file to a folder. This file contains
>customer information of all customers. This file is in a folder in the
>local server. File name is: customer.tsv
>   1. Customer.tsv contains customerid, country, birty_month,
>   activation_date etc
>2. I need to read the contents of customer.tsv.
>3. I will add current timestamp info to the file.
>4. I will transfer customer.tsv to the S3 bucket: customer.history.data
>
>
>
> In the daily loads:
>
>1.  CRM application will send a new file which contains the
>updated/deleted/inserted customer information.
>
>   File name is daily_customer.tsv
>
>1. Daily_customer.tsv contains contains customerid, cdc_field,
>   country, birty_month, activation_date etc
>
> Cdc field can be New-Customer, Customer-is-Updated, Customer-is-Deleted.
>
>1. I need to read the contents of daily_customer.tsv.
>2. I will add current timestamp info to the file.
>3. I will transfer daily_customer.tsv to the S3 bucket:
>customer.daily.data
>4. I need to merge two buckets customer.history.data and
>customer.daily.data.
>   1. Two buckets have timestamp fields. So I need to query all
>   records whose timestamp is the last timestamp.
>   2. I can use row_number() over(partition by customer_id order by
>   timestamp_field desc) as version_number
>   3. Then I can put the records whose version is one, to the final
>   bucket: customer.dimension.data
>
>
>
> I am running Spark on premise.
>
>- Can I query on AWS S3 buckets by using Spark Sql / Dataframe or RDD
>on a local Spark cluster?
>- Is this approach efficient? Will the queries transfer all historical
>data from AWS S3 to the local cluster?
>- How can I implement this scenario in a more effective way? Like just
>transferring daily data to AWS S3 and then running queries on AWS.
>   - For instance Athena can query on AWS. But it is just a query
>   engine. As I know I can not call it by using an sdk and I can not write 
> the
>   results to a bucket/folder.
>
>
>
> Thanks in advance,
>
> Ömer
>
>
>
>
>
>
>
>
>