RE: Writing DataFrame filter results to separate files

2016-12-05 Thread Mendelson, Assaf
If you write to parquet you can use the partitionBy option which would write 
under a directory for each value of the column (assuming you have a column with 
the month).

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Tuesday, December 06, 2016 3:33 AM
To: Everett Anderson
Cc: user
Subject: Re: Writing DataFrame filter results to separate files

1. In my case, I'd need to first explode my data by ~12x to assign each record 
to multiple 12-month rolling output windows. I'm not sure Spark SQL would be 
able to optimize this away, combining it with the output writing to do it 
incrementally.

You are right, but I wouldn't worry about the RAM use.  If implemented properly 
(or if you just use the builtin 
window
 function), it should all be pipelined.

2. Wouldn't each partition -- window in my case -- be shuffled to a single 
machine and then written together as one output shard? For a large amount of 
data per window, that seems less than ideal.

Oh sorry, I thought you wanted one file per value.  If you drop the repartition 
then it won't shuffle, but will just write in parallel on each machine.


driver in queued state and not started

2016-12-05 Thread Yu Wei
Hi Guys,


I tried to run spark on mesos cluster.

However, when I tried to submit jobs via spark-submit. The driver is in "Queued 
state" and not started.


Which should I check?



Thanks,

Jared, (??)
Software developer
Interested in open source software, big data, Linux


Re: Monitoring the User Metrics for a long running Spark Job

2016-12-05 Thread Chawla,Sumit
An example implementation i found is :
https://github.com/groupon/spark-metrics

Anyone has any experience using this?  I am more interested in something
for Pyspark specifically.

The above link pointed to -
https://github.com/apache/spark/blob/master/conf/metrics.properties.template.
I need to spend some time reading it, but any quick pointers will be
appreciated.



Regards
Sumit Chawla


On Mon, Dec 5, 2016 at 8:17 PM, Chawla,Sumit  wrote:

> Hi Manish
>
> I am specifically looking for something similar to following:
>
>  https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/common/index.html#accumulators--counters.
>
> Flink has this concept of Accumulators, where user can keep its custom
> counters etc.  While the application is executing these counters are
> queryable through REST API provided by Flink Monitoring Backend.  This way
> you don't have to wait for the program to complete.
>
>
>
> Regards
> Sumit Chawla
>
>
> On Mon, Dec 5, 2016 at 5:53 PM, manish ranjan 
> wrote:
>
>> http://spark.apache.org/docs/latest/monitoring.html
>>
>> You can even install tools like  dstat
>> , iostat
>> , and iotop
>> , *collectd*  can provide fine-grained
>> profiling on individual nodes.
>>
>> If you are using Mesos as Resource Manager , mesos exposes metrics as
>> well for the running job.
>>
>> Manish
>>
>> ~Manish
>>
>>
>>
>> On Mon, Dec 5, 2016 at 4:17 PM, Chawla,Sumit 
>> wrote:
>>
>>> Hi All
>>>
>>> I have a long running job which takes hours and hours to process data.
>>> How can i monitor the operational efficency of this job?  I am interested
>>> in something like Storm\Flink style User metrics/aggregators, which i can
>>> monitor while my job is running.  Using these metrics i want to monitor,
>>> per partition performance in processing items.  As of now, only way for me
>>> to get these metrics is when the job finishes.
>>>
>>> One possibility is that spark can flush the metrics to external system
>>> every few seconds, and thus use  an external system to monitor these
>>> metrics.  However, i wanted to see if the spark supports any such use case
>>> OOB.
>>>
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>
>


Re: Monitoring the User Metrics for a long running Spark Job

2016-12-05 Thread Miguel Morales
One thing I've done before is to install datadogs statsd agent on the nodes.  
Then you can emit metrics and stats to it and build dashboards on datadog.

Sent from my iPhone

> On Dec 5, 2016, at 8:17 PM, Chawla,Sumit  wrote:
> 
> Hi Manish
> 
> I am specifically looking for something similar to following: 
> 
>  
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/common/index.html#accumulators--counters.
>
> 
> Flink has this concept of Accumulators, where user can keep its custom 
> counters etc.  While the application is executing these counters are 
> queryable through REST API provided by Flink Monitoring Backend.  This way 
> you don't have to wait for the program to complete. 
> 
> 
> 
> Regards
> Sumit Chawla
> 
> 
>> On Mon, Dec 5, 2016 at 5:53 PM, manish ranjan  wrote:
>> http://spark.apache.org/docs/latest/monitoring.html
>> 
>> You can even install tools like  dstat, iostat, and iotop, collectd  can 
>> provide fine-grained profiling on individual nodes. 
>> 
>> If you are using Mesos as Resource Manager , mesos exposes metrics as well 
>> for the running job.
>> 
>> Manish
>> 
>> ~Manish
>> 
>> 
>> 
>>> On Mon, Dec 5, 2016 at 4:17 PM, Chawla,Sumit  wrote:
>>> Hi All
>>> 
>>> I have a long running job which takes hours and hours to process data.  How 
>>> can i monitor the operational efficency of this job?  I am interested in 
>>> something like Storm\Flink style User metrics/aggregators, which i can 
>>> monitor while my job is running.  Using these metrics i want to monitor, 
>>> per partition performance in processing items.  As of now, only way for me 
>>> to get these metrics is when the job finishes. 
>>> 
>>> One possibility is that spark can flush the metrics to external system 
>>> every few seconds, and thus use  an external system to monitor these 
>>> metrics.  However, i wanted to see if the spark supports any such use case 
>>> OOB.
>>> 
>>> 
>>> Regards
>>> Sumit Chawla
>>> 
>> 
> 


Re: Monitoring the User Metrics for a long running Spark Job

2016-12-05 Thread Chawla,Sumit
Hi Manish

I am specifically looking for something similar to following:


https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/common/index.html#accumulators--counters.


Flink has this concept of Accumulators, where user can keep its custom
counters etc.  While the application is executing these counters are
queryable through REST API provided by Flink Monitoring Backend.  This way
you don't have to wait for the program to complete.



Regards
Sumit Chawla


On Mon, Dec 5, 2016 at 5:53 PM, manish ranjan  wrote:

> http://spark.apache.org/docs/latest/monitoring.html
>
> You can even install tools like  dstat
> , iostat
> , and iotop
> , *collectd*  can provide fine-grained
> profiling on individual nodes.
>
> If you are using Mesos as Resource Manager , mesos exposes metrics as well
> for the running job.
>
> Manish
>
> ~Manish
>
>
>
> On Mon, Dec 5, 2016 at 4:17 PM, Chawla,Sumit 
> wrote:
>
>> Hi All
>>
>> I have a long running job which takes hours and hours to process data.
>> How can i monitor the operational efficency of this job?  I am interested
>> in something like Storm\Flink style User metrics/aggregators, which i can
>> monitor while my job is running.  Using these metrics i want to monitor,
>> per partition performance in processing items.  As of now, only way for me
>> to get these metrics is when the job finishes.
>>
>> One possibility is that spark can flush the metrics to external system
>> every few seconds, and thus use  an external system to monitor these
>> metrics.  However, i wanted to see if the spark supports any such use case
>> OOB.
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>


Re: Unsubscribe

2016-12-05 Thread mehak soni
unsubscribe

On Sat, Dec 3, 2016 at 2:55 PM, kote rao  wrote:

> unsubscribe
> --
> *From:* S Malligarjunan 
> *Sent:* Saturday, December 3, 2016 11:55:41 AM
> *To:* user@spark.apache.org
> *Subject:* Re: Unsubscribe
>
> Unsubscribe
>
> Thanks and Regards,
> Malligarjunan S.
>
>
>
> On Saturday, 3 December 2016, 20:42, Sivakumar S <
> write2sivaku...@gmail.com> wrote:
>
>
> Unsubscribe
>
>
>


Re: Can spark support exactly once based kafka ? Due to these following question?

2016-12-05 Thread Cody Koeninger
Have you read / watched the materials linked from
https://github.com/koeninger/kafka-exactly-once

On Mon, Dec 5, 2016 at 4:17 AM, Jörn Franke  wrote:
> You need to do the book keeping of what has been processed yourself. This
> may mean roughly the following (of course the devil is in the details):
> Write down in zookeeper which part of the processing job has been done and
> for which dataset all the data has been created (do not keep the data itself
> in zookeeper).
> Once you start a processing job, check in zookeeper if it has been
> processed, if not remove all staging data, if yes terminate.
>
> As I said the details depend on your job and require some careful thinking,
> but exactly once can be achieved with Spark (and potentially zookeeper or
> similar, such as Redis).
> Of course at the same time think if you need delivery in order etc.
>
> On 5 Dec 2016, at 08:59, Michal Šenkýř  wrote:
>
> Hello John,
>
> 1. If a task complete the operation, it will notify driver. The driver may
> not receive the message due to the network, and think the task is still
> running. Then the child stage won't be scheduled ?
>
> Spark's fault tolerance policy is, if there is a problem in processing a
> task or an executor is lost, run the task (and any dependent tasks) again.
> Spark attempts to minimize the number of tasks it has to recompute, so
> usually only a small part of the data is recomputed.
>
> So in your case, the driver simply schedules the task on another executor
> and continues to the next stage when it receives the data.
>
> 2. how do spark guarantee the downstream-task can receive the shuffle-data
> completely. As fact, I can't find the checksum for blocks in spark. For
> example, the upstream-task may shuffle 100Mb data, but the downstream-task
> may receive 99Mb data due to network. Can spark verify the data is received
> completely based size ?
>
> Spark uses compression with checksuming for shuffle data so it should know
> when the data is corrupt and initiate a recomputation.
>
> As for your question in the subject:
> All of this means that Spark supports at-least-once processing. There is no
> way that I know of to ensure exactly-once. You can try to minimize
> more-than-once situations by updating your offsets as soon as possible but
> that does not eliminate the problem entirely.
>
> Hope this helps,
>
> Michal Senkyr

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Cody Koeninger
If you want finer-grained max rate setting, SPARK-17510 got merged a
while ago.  There's also SPARK-18580 which might help address the
issue of starting backpressure rate for the first batch.

On Mon, Dec 5, 2016 at 4:18 PM, Liren Ding  wrote:
> Hey all,
>
> Does backressure actually work on spark kafka streaming? According to the
> latest spark streaming document:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
> "In Spark 1.5, we have introduced a feature called backpressure that
> eliminate the need to set this rate limit, as Spark Streaming automatically
> figures out the rate limits and dynamically adjusts them if the processing
> conditions change. This backpressure can be enabled by setting the
> configuration parameter spark.streaming.backpressure.enabled to true."
> But I also see a few open spark jira tickets on this option:
> https://issues.apache.org/jira/browse/SPARK-7398
> https://issues.apache.org/jira/browse/SPARK-18371
>
> The case in the second ticket describes a similar issue as we have here. We
> use Kafka to send large batches (10~100M) to spark streaming, and the spark
> streaming interval is set to 1~4 minutes. With the backpressure set to true,
> the queued active batches still pile up when average batch processing time
> takes longer than default interval. After the spark driver is restarted, all
> queued batches turn to a giant batch, which block subsequent batches and
> also have a great chance to fail eventually. The only config we found that
> might help is "spark.streaming.kafka.maxRatePerPartition". It does limit the
> incoming batch size, but not a perfect solution since it depends on size of
> partition as well as the length of batch interval. For our case, hundreds of
> partitions X minutes of interval still produce a number that is too large
> for each batch. So we still want to figure out how to make the backressure
> work in spark kafka streaming, if it is supposed to work there. Thanks.
>
>
> Liren
>
>
>
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Monitoring the User Metrics for a long running Spark Job

2016-12-05 Thread manish ranjan
http://spark.apache.org/docs/latest/monitoring.html

You can even install tools like  dstat
, iostat
, and iotop
, *collectd*  can provide fine-grained
profiling on individual nodes.

If you are using Mesos as Resource Manager , mesos exposes metrics as well
for the running job.

Manish

~Manish



On Mon, Dec 5, 2016 at 4:17 PM, Chawla,Sumit  wrote:

> Hi All
>
> I have a long running job which takes hours and hours to process data.
> How can i monitor the operational efficency of this job?  I am interested
> in something like Storm\Flink style User metrics/aggregators, which i can
> monitor while my job is running.  Using these metrics i want to monitor,
> per partition performance in processing items.  As of now, only way for me
> to get these metrics is when the job finishes.
>
> One possibility is that spark can flush the metrics to external system
> every few seconds, and thus use  an external system to monitor these
> metrics.  However, i wanted to see if the spark supports any such use case
> OOB.
>
>
> Regards
> Sumit Chawla
>
>


Re: custom generate spark application id

2016-12-05 Thread Jakob Odersky
The app ID is assigned internally by spark's task scheduler
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala#L35.
You could probably change the naming, however I'm pretty sure that the
ID will always have to be unique for a context on a cluster.
Alternatively, could setting the name (conf.setAppName or via
"spark.app.name" config) help with what you're trying to achieve?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [Spark Streaming] How to do join two messages in spark streaming(Probabaly messasges are in differnet RDD) ?

2016-12-05 Thread Sanchuan Cheng (sancheng)


smime.p7m
Description: S/MIME encrypted message


[Spark Streaming] How to do join two messages in spark streaming(Probabaly messasges are in differnet RDD) ?

2016-12-05 Thread sancheng
Hello,

we are trying to use Spark streaming to do some billing related application.
so  our case is 
that we need to correlate two different messages, and calculate the time
invterval between two
messages, the two message should be in same partition but probabaly not in
the same RDD,
it seems spark DStream.join can only join between current RDDs. 

We want to avoid store the erailer message to some datastore but prefer to
use natvie solution if
these is any in Spark.

pls advice, thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-How-to-do-join-two-messages-in-spark-streaming-Probabaly-messasges-are-in-differnet--tp28161.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Michael Armbrust
>
> 1. In my case, I'd need to first explode my data by ~12x to assign each
> record to multiple 12-month rolling output windows. I'm not sure Spark SQL
> would be able to optimize this away, combining it with the output writing
> to do it incrementally.
>

You are right, but I wouldn't worry about the RAM use.  If implemented
properly (or if you just use the builtin window

function), it should all be pipelined.


> 2. Wouldn't each partition -- window in my case -- be shuffled to a single
> machine and then written together as one output shard? For a large amount
> of data per window, that seems less than ideal.
>

Oh sorry, I thought you wanted one file per value.  If you drop the
repartition then it won't shuffle, but will just write in parallel on each
machine.


Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Everett Anderson
Hi,

Thanks for the reply!

On Mon, Dec 5, 2016 at 1:30 PM, Michael Armbrust 
wrote:

> If you repartition($"column") and then do .write.partitionBy("column") you
> should end up with a single file for each value of the partition column.
>

I have two concerns there:

1. In my case, I'd need to first explode my data by ~12x to assign each
record to multiple 12-month rolling output windows. I'm not sure Spark SQL
would be able to optimize this away, combining it with the output writing
to do it incrementally.

2. Wouldn't each partition -- window in my case -- be shuffled to a single
machine and then written together as one output shard? For a large amount
of data per window, that seems less than ideal.


>
> On Mon, Dec 5, 2016 at 10:59 AM, Everett Anderson <
> ever...@nuna.com.invalid> wrote:
>
>> Hi,
>>
>> I have a DataFrame of records with dates, and I'd like to write all
>> 12-month (with overlap) windows to separate outputs.
>>
>> Currently, I have a loop equivalent to:
>>
>> for ((windowStart, windowEnd) <- windows) {
>> val windowData = allData.filter(
>> getFilterCriteria(windowStart, windowEnd))
>> windowData.write.format(...).save(...)
>> }
>>
>> This works fine, but has the drawback that since Spark doesn't
>> parallelize the writes, there is a fairly cost based on the number of
>> windows.
>>
>> Is there a way around this?
>>
>> In MapReduce, I'd probably multiply the data in a Mapper with a window ID
>> and then maybe use something like MultipleOutputs
>> .
>> But I'm a bit worried of trying to do this in Spark because of the data
>> explosion and RAM use. What's the best approach?
>>
>> Thanks!
>>
>> - Everett
>>
>>
>


Re: Spark-shell doesn't see changes coming from Kafka topic

2016-12-05 Thread Otávio Carvalho
In the end, the mistake I made was that I forgot to setup the proper export
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY on the machine I was running
the spark-shell.

Nevertheless, thanks for answering, Tathagata Das.

Otávio.

2016-12-01 17:36 GMT-02:00 Tathagata Das :

> Can you confirm the following?
> 1. Are you sending new data to the Kafka topic AFTER starting the
> streaming query? Since you have specified `*startingOffsets` *as*
> `latest`*, data needs to the topic after the query start for the query to
> receiver.
> 2. Are you able to read kafka data using Kafka's console consumer, from
> the same machine running the query? That would clear up any confusion
> regarding connectivity.
>
> If the above are cleared, I would look at INFO and DEBUG level log4j logs
> to see what the query is doing? is it stuck at some point or is it
> continuously running but not finding latest offsets?
>
>
> On Thu, Dec 1, 2016 at 6:31 AM, Otávio Carvalho  wrote:
>
>> Hello hivemind,
>>
>> I am trying to connect my Spark 2.0.2 cluster to an Apache Kafka 0.10
>> cluster via spark-shell.
>>
>> The connection works fine, but it is not able to receive the messages
>> published to the topic.
>>
>> It doesn't throw any error, but it is not able to retrieve any message (I
>> am sure that messages are being published 'cause I am able to read from the
>> topic from the same machine)
>>
>> Here follows the spark-shell code/output:
>>
>> *val ds1 = spark.readStream*
>> *.format("kafka")*
>> *.option("subscribe", "clickstream")*
>> *.option("kafka.bootstrap.servers",
>> "ec2-54-208-12-171.compute-1.amazonaws.com:9092
>> ")*
>> *.option("startingOffsets", "latest")*
>> *.load*
>>
>> *// Exiting paste mode, now interpreting.*
>>
>> *ds1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
>> more fields]*
>>
>> *scala> val counter = ds1.groupBy("value").count*
>> *counter: org.apache.spark.sql.DataFrame = [value: binary, count: bigint]*
>>
>> *scala> import org.apache.spark.sql.streaming.OutputMode.Complete*
>> *import org.apache.spark.sql.streaming.OutputMode.Complete*
>>
>> *val query = counter.writeStream*
>> *  .outputMode(Complete)*
>> *  .format("console")*
>> *  .start*
>>
>> *// Exiting paste mode, now interpreting.*
>>
>> *query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query -
>> query-1 [state = ACTIVE]*
>>
>> *scala> query.status*
>> *res0: org.apache.spark.sql.streaming.StreamingQueryStatus =*
>> *Status of query 'query-1'*
>> *Query id: 1*
>> *Status timestamp: 1480602056895*
>> *Input rate: 0.0 rows/sec*
>> *Processing rate 0.0 rows/sec*
>> *Latency: - ms*
>> *Trigger details:*
>> *isTriggerActive: true*
>> *statusMessage: Finding new data from sources*
>> *timestamp.triggerStart: 1480602056894*
>> *triggerId: -1*
>> *Source statuses [1 source]:*
>> *Source 1 - KafkaSource[Subscribe[clickstream]]*
>> *Available offset: -*
>> *Input rate: 0.0 rows/sec*
>> *Processing rate: 0.0 rows/sec*
>> *Trigger details:*
>> *triggerId: -1*
>> *Sink status -
>> org.apache.spark.sql.execution.streaming.ConsoleSink@54d5b6cb*
>> *Committed offsets: [-]*
>>
>> I am starting the spark-shell as follows:
>> /root/spark/bin/spark-shell --packages org.apache.spark:spark-sql-kaf
>> ka-0-10_2.10:2.0.2
>>
>> Thanks,
>> Otávio Carvalho.
>>
>> --
>> Otávio Carvalho
>> Consultant Developer
>> Email ocarv...@thoughtworks.com
>> Telephone +55 53 91565742 <+55+53+91565742>
>> [image: ThoughtWorks]
>> 
>>
>
>


custom generate spark application id

2016-12-05 Thread rtijoriwala
Hi,

We would like to control how spark generates its application id. Currently,
it changes everytime we restart the job and also hard to correlate. For e.g.
it looks like this - app-20161129054045-0096.

I would like to control how this id gets generated to its easier to track
when jmx metrics are reported to graphite.

Any help appreciated.

Thanks,

Ritesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/custom-generate-spark-application-id-tp28160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Please unsubscribe me

2016-12-05 Thread Srinivas Potluri



Monitoring the User Metrics for a long running Spark Job

2016-12-05 Thread Chawla,Sumit
Hi All

I have a long running job which takes hours and hours to process data.  How
can i monitor the operational efficency of this job?  I am interested in
something like Storm\Flink style User metrics/aggregators, which i can
monitor while my job is running.  Using these metrics i want to monitor,
per partition performance in processing items.  As of now, only way for me
to get these metrics is when the job finishes.

One possibility is that spark can flush the metrics to external system
every few seconds, and thus use  an external system to monitor these
metrics.  However, i wanted to see if the spark supports any such use case
OOB.


Regards
Sumit Chawla


Re: Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Richard Startin
I've seen the feature work very well. For tuning, you've got:

spark.streaming.backpressure.pid.proportional (defaults to 1, non-negative) - 
weight for response to "error" (change between last batch and this batch)
spark.streaming.backpressure.pid.integral (defaults to 0.2, non-negative) - 
weight for the response to the accumulation of error. This has a dampening 
effect.
spark.streaming.backpressure.pid.derived (defaults to zero, non-negative) - 
weight for the response to the trend in error. This can cause 
arbitrary/noise-induced fluctuations in batch size, but can also help react 
quickly to increased/reduced capacity.
spark.streaming.backpressure.pid.minRate - the default value is 100 (must be 
positive), batch size won't go below this.

spark.streaming.receiver.maxRate - batch size won't go above this.


Cheers,

Richard


https://richardstartin.com/



From: Liren Ding 
Sent: 05 December 2016 22:18
To: d...@spark.apache.org; user@spark.apache.org
Subject: Back-pressure to Spark Kafka Streaming?

Hey all,

Does backressure actually work on spark kafka streaming? According to the 
latest spark streaming document:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
"In Spark 1.5, we have introduced a feature called backpressure that eliminate 
the need to set this rate limit, as Spark Streaming automatically figures out 
the rate limits and dynamically adjusts them if the processing conditions 
change. This backpressure can be enabled by setting the configuration parameter 
spark.streaming.backpressure.enabled to true."
But I also see a few open spark jira tickets on this option:
https://issues.apache.org/jira/browse/SPARK-7398
https://issues.apache.org/jira/browse/SPARK-18371

The case in the second ticket describes a similar issue as we have here. We use 
Kafka to send large batches (10~100M) to spark streaming, and the spark 
streaming interval is set to 1~4 minutes. With the backpressure set to true, 
the queued active batches still pile up when average batch processing time 
takes longer than default interval. After the spark driver is restarted, all 
queued batches turn to a giant batch, which block subsequent batches and also 
have a great chance to fail eventually. The only config we found that might 
help is "spark.streaming.kafka.maxRatePerPartition". It does limit the incoming 
batch size, but not a perfect solution since it depends on size of partition as 
well as the length of batch interval. For our case, hundreds of partitions X 
minutes of interval still produce a number that is too large for each batch. So 
we still want to figure out how to make the backressure work in spark kafka 
streaming, if it is supposed to work there. Thanks.


Liren









Streaming audio files

2016-12-05 Thread habibbaluwala
I have a HDFS folder that keeps on getting new audio files every few minutes.
My objective is to detect new files that have been added to the folder, and
then process the files in parallel without splitting it into multiple
blocks. Basically, if there are 4 new audio files added, I want the Spark
engine to detect the four files names/locations and then I can provide the
four file locations and it can use four processors to process each file. 

I tried using FileStream but there I would have to split the files into
blocks, which I do not want.  Is there any other solution ?   






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-audio-files-tp28159.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-12-05 Thread Hyukjin Kwon
Hi Kant,

Ah, I thought you wanted to find the workaround to so it.

Then wouldn't this be easily able to reach the same goal with the
workaround without new such API?

Thanks.




On 6 Dec 2016 4:11 a.m., "kant kodali"  wrote:

> Hi Kwon,
>
> Thanks for this but Isn't this what Michael suggested?
>
> Thanks,
> kant
>
> On Mon, Dec 5, 2016 at 4:45 AM, Hyukjin Kwon  wrote:
>
>> Hi Kant,
>>
>> How about doing something like this?
>>
>> import org.apache.spark.sql.functions._
>>
>> // val df2 = df.select(df("body").cast(StringType).as("body"))
>> val df2 = Seq("""{"a": 1}""").toDF("body")
>> val schema = spark.read.json(df2.as[String].rdd).schema
>> df2.select(from_json(col("body"), schema)).show()
>>
>> ​
>>
>> 2016-12-05 19:51 GMT+09:00 kant kodali :
>>
>>> Hi Michael,
>>>
>>> " Personally, I usually take a small sample of data and use schema
>>> inference on that.  I then hardcode that schema into my program.  This
>>> makes your spark jobs much faster and removes the possibility of the schema
>>> changing underneath the covers."
>>>
>>> This may or may not work for us. Not all rows have the same schema. The
>>> number of distinct schemas we have now may be small but going forward this
>>> can go to any number moreover a distinct call can lead to a table scan
>>> which can be billions of rows for us.
>>>
>>> I also would agree to keep the API consistent than making an exception
>>> however I wonder if it make sense to provide an action call to infer the
>>> schema which would return a new dataframe after the action call finishes
>>> (after schema inference)? For example, something like below ?
>>>
>>> val inferedDF = df.inferSchema(col1);
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>> On Mon, Nov 28, 2016 at 6:12 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 You could open up a JIRA to add a version of from_json that supports
 schema inference, but unfortunately that would not be super easy to
 implement.  In particular, it would introduce a weird case where only this
 specific function would block for a long time while we infer the schema
 (instead of waiting for an action).  This blocking would be kind of odd for
 a call like df.select(...).  If there is enough interest, though, we
 should still do it.

 To give a little more detail, your version of the code is actually
 doing two passes over the data: one to infer the schema and a second for
 whatever processing you are asking it to do.  We have to know the schema at
 each step of DataFrame construction, so we'd have to do this even before
 you called an action.

 Personally, I usually take a small sample of data and use schema
 inference on that.  I then hardcode that schema into my program.  This
 makes your spark jobs much faster and removes the possibility of the schema
 changing underneath the covers.

 Here's some code I use to build the static schema code automatically
 
 .

 Would that work for you? If not, why not?

 On Wed, Nov 23, 2016 at 2:48 AM, kant kodali 
 wrote:

> Hi Michael,
>
> Looks like all from_json functions will require me to pass schema and
> that can be little tricky for us but the code below doesn't require me to
> pass schema at all.
>
> import org.apache.spark.sql._
> val rdd = df2.rdd.map { case Row(j: String) => j }
> spark.read.json(rdd).show()
>
>
> On Tue, Nov 22, 2016 at 2:42 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> The first release candidate should be coming out this week. You can
>> subscribe to the dev list if you want to follow the release schedule.
>>
>> On Mon, Nov 21, 2016 at 9:34 PM, kant kodali 
>> wrote:
>>
>>> Hi Michael,
>>>
>>> I only see spark 2.0.2 which is what I am using currently. Any idea
>>> on when 2.1 will be released?
>>>
>>> Thanks,
>>> kant
>>>
>>> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 In Spark 2.1 we've added a from_json
 
 function that I think will do what you want.

 On Fri, Nov 18, 2016 at 2:29 AM, kant kodali 
 wrote:

> This seem to work
>
> import org.apache.spark.sql._
> val rdd = df2.rdd.map { case Row(j: String) => j }
> spark.read.json(rdd).show()
>
> However I wonder if this any inefficiency here ? since I have to
> apply this function for billion rows.
>

Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Liren Ding
Hey all,

Does backressure actually work on spark kafka streaming? According to the
latest spark streaming document:
*http://spark.apache.org/docs/latest/streaming-programming-guide.html
*
"*In Spark 1.5, we have introduced a feature called backpressure that
eliminate the need to set this rate limit, as Spark Streaming automatically
figures out the rate limits and dynamically adjusts them if the processing
conditions change. This backpressure can be enabled by setting the
configuration parameter spark.streaming.backpressure.enabled to true.*"
But I also see a few open spark jira tickets on this option:

*https://issues.apache.org/jira/browse/SPARK-7398
*
*https://issues.apache.org/jira/browse/SPARK-18371
*

The case in the second ticket describes a similar issue as we have here. We
use Kafka to send large batches (10~100M) to spark streaming, and the spark
streaming interval is set to 1~4 minutes. With the backpressure set to
true, the queued active batches still pile up when average batch processing
time takes longer than default interval. After the spark driver is
restarted, all queued batches turn to a giant batch, which block subsequent
batches and also have a great chance to fail eventually. The only config we
found that might help is "*spark.streaming.kafka.maxRatePerPartition*". It
does limit the incoming batch size, but not a perfect solution since it
depends on size of partition as well as the length of batch interval. For
our case, hundreds of partitions X minutes of interval still produce a
number that is too large for each batch. So we still want to figure out how
to make the backressure work in spark kafka streaming, if it is supposed to
work there. Thanks.


Liren


Re: Pretrained Word2Vec models

2016-12-05 Thread Robin East
There is a JIRA and a pull request for this - 
https://issues.apache.org/jira/browse/SPARK-15328 - however there has been no 
movement on this for a few months. As you’ll see from the pull request review I 
was able to load the Google News Model but could not get it to run.
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 5 Dec 2016, at 21:34, Lee Becker  wrote:
> 
> Hi all,
> 
> Is there a way for Spark to load Word2Vec models trained using gensim 
>  or the original C implementation 
>  of Word2Vec?  Specifically I'd 
> like to play with the Google News model 
> 
>  or the Freebase model 
> 
>  to see how they perform before training my own.
> 
> Thanks,
> Lee



Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Sean Owen
As I recall, it is in there in the math, but doesn't appear as an explicit
term in the computation. You don't actually materialize the 0 input or the
"c=1" corresponding to them.

Or: do you have a computation that agrees with the paper but not this code?

Put another way, none of this would scale/work if you had to materialize
and compute all of these elements.

On Tue, Dec 6, 2016 at 5:55 AM Jerry Lam  wrote:

> Hi Sean,
>
> I agree there is no need for that if the implementation actually assigns
> c=1 for all missing ratings but from the current implementation of ALS, I
> don't think it is doing that.
> The idea is that for missing ratings, they are assigned to c=1 (in the
> paper) and they do contribute to the optimization of equation (3).
>
> The lines of code that I'm referring to is:
>
> {code}
> if (implicitPrefs) {
>   // Extension to the original paper to handle b < 0.
> confidence is a function of |b|
>   // instead so that it is never negative. c1 is confidence -
> 1.0.
>   val c1 = alpha * math.abs(rating)
>   // For rating <= 0, the corresponding preference is 0. So
> the term below is only added
>   // for rating > 0. Because YtY is already added, we need to
> adjust the scaling here.
>   if (rating > 0) {
> numExplicits += 1
> ls.add(srcFactor, (c1 + 1.0) / c1, c1)
>   }
> } else {
>   ls.add(srcFactor, rating)
>   numExplicits += 1
> }
> {code}
>
> Regards,
>
> Jerry
>
>
> On Mon, Dec 5, 2016 at 3:27 PM, Sean Owen  wrote:
>
> That doesn't mean this 0 value is literally included in the input. There's
> no need for that.
>
> On Tue, Dec 6, 2016 at 4:24 AM Jerry Lam  wrote:
>
> Hi Sean,
>
> I'm referring to the paper (http://yifanhu.net/PUB/cf.pdf) Section 2:
> " However, with implicit feedback it would be natural to assign values to
> all rui variables. If no action was observed rui is set to zero, thus
> meaning in our examples zero watching time, or zero purchases on record."
>
> In the implicit setting, apparently there should have values for all pairs
> (u, i) instead of just the observed ones according to the paper. This is
> also true for other implicit feedback papers I read.
>
> In section 4, when r=0, p=0 BUT c=1. Therefore, when we optimize the value
> for this pair. (x^Ty)^2 + regularization.
>
> Do I misunderstand the paper?
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Dec 5, 2016 at 2:43 PM, Sean Owen  wrote:
>
> What are you referring to in what paper? implicit input would never
> materialize 0s for missing values.
>
> On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam  wrote:
>
> Hello spark users and developers,
>
> I read the paper from Yahoo about CF with implicit feedback and other
> papers using implicit feedbacks. Their implementation require to set the
> missing rating with 0. That is for unobserved ratings, the confidence for
> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
> matrix.
>
> I read the source code of the ALS implementation in spark (version 1.6.x)
> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
> in ALS.scala). It could be a mistake or it could be an optimization. Just
> want to see if anyone steps on this yet.
>
> Best Regards,
>
> Jerry
>
>
>
>


Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hi Sean,

I agree there is no need for that if the implementation actually assigns
c=1 for all missing ratings but from the current implementation of ALS, I
don't think it is doing that.
The idea is that for missing ratings, they are assigned to c=1 (in the
paper) and they do contribute to the optimization of equation (3).

The lines of code that I'm referring to is:

{code}
if (implicitPrefs) {
  // Extension to the original paper to handle b < 0.
confidence is a function of |b|
  // instead so that it is never negative. c1 is confidence -
1.0.
  val c1 = alpha * math.abs(rating)
  // For rating <= 0, the corresponding preference is 0. So the
term below is only added
  // for rating > 0. Because YtY is already added, we need to
adjust the scaling here.
  if (rating > 0) {
numExplicits += 1
ls.add(srcFactor, (c1 + 1.0) / c1, c1)
  }
} else {
  ls.add(srcFactor, rating)
  numExplicits += 1
}
{code}

Regards,

Jerry


On Mon, Dec 5, 2016 at 3:27 PM, Sean Owen  wrote:

> That doesn't mean this 0 value is literally included in the input. There's
> no need for that.
>
> On Tue, Dec 6, 2016 at 4:24 AM Jerry Lam  wrote:
>
>> Hi Sean,
>>
>> I'm referring to the paper (http://yifanhu.net/PUB/cf.pdf) Section 2:
>> " However, with implicit feedback it would be natural to assign values to
>> all rui variables. If no action was observed rui is set to zero, thus
>> meaning in our examples zero watching time, or zero purchases on record."
>>
>> In the implicit setting, apparently there should have values for all
>> pairs (u, i) instead of just the observed ones according to the paper. This
>> is also true for other implicit feedback papers I read.
>>
>> In section 4, when r=0, p=0 BUT c=1. Therefore, when we optimize the
>> value for this pair. (x^Ty)^2 + regularization.
>>
>> Do I misunderstand the paper?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Mon, Dec 5, 2016 at 2:43 PM, Sean Owen  wrote:
>>
>> What are you referring to in what paper? implicit input would never
>> materialize 0s for missing values.
>>
>> On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam  wrote:
>>
>> Hello spark users and developers,
>>
>> I read the paper from Yahoo about CF with implicit feedback and other
>> papers using implicit feedbacks. Their implementation require to set the
>> missing rating with 0. That is for unobserved ratings, the confidence for
>> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
>> matrix.
>>
>> I read the source code of the ALS implementation in spark (version 1.6.x)
>> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
>> in ALS.scala). It could be a mistake or it could be an optimization. Just
>> want to see if anyone steps on this yet.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>


Pretrained Word2Vec models

2016-12-05 Thread Lee Becker
Hi all,

Is there a way for Spark to load Word2Vec models trained using gensim
 or the original C implementation
 of Word2Vec?  Specifically
I'd like to play with the Google News model

or
the Freebase model

to
see how they perform before training my own.

Thanks,
Lee


Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Michael Armbrust
If you repartition($"column") and then do .write.partitionBy("column") you
should end up with a single file for each value of the partition column.

On Mon, Dec 5, 2016 at 10:59 AM, Everett Anderson 
wrote:

> Hi,
>
> I have a DataFrame of records with dates, and I'd like to write all
> 12-month (with overlap) windows to separate outputs.
>
> Currently, I have a loop equivalent to:
>
> for ((windowStart, windowEnd) <- windows) {
> val windowData = allData.filter(
> getFilterCriteria(windowStart, windowEnd))
> windowData.write.format(...).save(...)
> }
>
> This works fine, but has the drawback that since Spark doesn't parallelize
> the writes, there is a fairly cost based on the number of windows.
>
> Is there a way around this?
>
> In MapReduce, I'd probably multiply the data in a Mapper with a window ID
> and then maybe use something like MultipleOutputs
> .
> But I'm a bit worried of trying to do this in Spark because of the data
> explosion and RAM use. What's the best approach?
>
> Thanks!
>
> - Everett
>
>


Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Sean Owen
That doesn't mean this 0 value is literally included in the input. There's
no need for that.

On Tue, Dec 6, 2016 at 4:24 AM Jerry Lam  wrote:

> Hi Sean,
>
> I'm referring to the paper (http://yifanhu.net/PUB/cf.pdf) Section 2:
> " However, with implicit feedback it would be natural to assign values to
> all rui variables. If no action was observed rui is set to zero, thus
> meaning in our examples zero watching time, or zero purchases on record."
>
> In the implicit setting, apparently there should have values for all pairs
> (u, i) instead of just the observed ones according to the paper. This is
> also true for other implicit feedback papers I read.
>
> In section 4, when r=0, p=0 BUT c=1. Therefore, when we optimize the value
> for this pair. (x^Ty)^2 + regularization.
>
> Do I misunderstand the paper?
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Dec 5, 2016 at 2:43 PM, Sean Owen  wrote:
>
> What are you referring to in what paper? implicit input would never
> materialize 0s for missing values.
>
> On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam  wrote:
>
> Hello spark users and developers,
>
> I read the paper from Yahoo about CF with implicit feedback and other
> papers using implicit feedbacks. Their implementation require to set the
> missing rating with 0. That is for unobserved ratings, the confidence for
> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
> matrix.
>
> I read the source code of the ALS implementation in spark (version 1.6.x)
> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
> in ALS.scala). It could be a mistake or it could be an optimization. Just
> want to see if anyone steps on this yet.
>
> Best Regards,
>
> Jerry
>
>
>


Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hi Sean,

I'm referring to the paper (http://yifanhu.net/PUB/cf.pdf) Section 2:
" However, with implicit feedback it would be natural to assign values to
all rui variables. If no action was observed rui is set to zero, thus
meaning in our examples zero watching time, or zero purchases on record."

In the implicit setting, apparently there should have values for all pairs
(u, i) instead of just the observed ones according to the paper. This is
also true for other implicit feedback papers I read.

In section 4, when r=0, p=0 BUT c=1. Therefore, when we optimize the value
for this pair. (x^Ty)^2 + regularization.

Do I misunderstand the paper?

Best Regards,

Jerry


On Mon, Dec 5, 2016 at 2:43 PM, Sean Owen  wrote:

> What are you referring to in what paper? implicit input would never
> materialize 0s for missing values.
>
> On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam  wrote:
>
>> Hello spark users and developers,
>>
>> I read the paper from Yahoo about CF with implicit feedback and other
>> papers using implicit feedbacks. Their implementation require to set the
>> missing rating with 0. That is for unobserved ratings, the confidence for
>> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
>> matrix.
>>
>> I read the source code of the ALS implementation in spark (version 1.6.x)
>> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
>> in ALS.scala). It could be a mistake or it could be an optimization. Just
>> want to see if anyone steps on this yet.
>>
>> Best Regards,
>>
>> Jerry
>>
>


Re: Livy with Spark

2016-12-05 Thread Vadim Semenov
You mean share a single spark context across multiple jobs?

https://github.com/spark-jobserver/spark-jobserver does the same

On Mon, Dec 5, 2016 at 9:33 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> Has there been any experience using Livy with Spark to share multiple
> Spark contexts?
>
> thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Sean Owen
What are you referring to in what paper? implicit input would never
materialize 0s for missing values.

On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam  wrote:

> Hello spark users and developers,
>
> I read the paper from Yahoo about CF with implicit feedback and other
> papers using implicit feedbacks. Their implementation require to set the
> missing rating with 0. That is for unobserved ratings, the confidence for
> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
> matrix.
>
> I read the source code of the ALS implementation in spark (version 1.6.x)
> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
> in ALS.scala). It could be a mistake or it could be an optimization. Just
> want to see if anyone steps on this yet.
>
> Best Regards,
>
> Jerry
>


Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hello spark users and developers,

I read the paper from Yahoo about CF with implicit feedback and other
papers using implicit feedbacks. Their implementation require to set the
missing rating with 0. That is for unobserved ratings, the confidence for
those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
matrix.

I read the source code of the ALS implementation in spark (version 1.6.x)
for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
in ALS.scala). It could be a mistake or it could be an optimization. Just
want to see if anyone steps on this yet.

Best Regards,

Jerry


Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-12-05 Thread kant kodali
Hi Kwon,

Thanks for this but Isn't this what Michael suggested?

Thanks,
kant

On Mon, Dec 5, 2016 at 4:45 AM, Hyukjin Kwon  wrote:

> Hi Kant,
>
> How about doing something like this?
>
> import org.apache.spark.sql.functions._
>
> // val df2 = df.select(df("body").cast(StringType).as("body"))
> val df2 = Seq("""{"a": 1}""").toDF("body")
> val schema = spark.read.json(df2.as[String].rdd).schema
> df2.select(from_json(col("body"), schema)).show()
>
> ​
>
> 2016-12-05 19:51 GMT+09:00 kant kodali :
>
>> Hi Michael,
>>
>> " Personally, I usually take a small sample of data and use schema
>> inference on that.  I then hardcode that schema into my program.  This
>> makes your spark jobs much faster and removes the possibility of the schema
>> changing underneath the covers."
>>
>> This may or may not work for us. Not all rows have the same schema. The
>> number of distinct schemas we have now may be small but going forward this
>> can go to any number moreover a distinct call can lead to a table scan
>> which can be billions of rows for us.
>>
>> I also would agree to keep the API consistent than making an exception
>> however I wonder if it make sense to provide an action call to infer the
>> schema which would return a new dataframe after the action call finishes
>> (after schema inference)? For example, something like below ?
>>
>> val inferedDF = df.inferSchema(col1);
>>
>> Thanks,
>>
>>
>>
>>
>> On Mon, Nov 28, 2016 at 6:12 PM, Michael Armbrust > > wrote:
>>
>>> You could open up a JIRA to add a version of from_json that supports
>>> schema inference, but unfortunately that would not be super easy to
>>> implement.  In particular, it would introduce a weird case where only this
>>> specific function would block for a long time while we infer the schema
>>> (instead of waiting for an action).  This blocking would be kind of odd for
>>> a call like df.select(...).  If there is enough interest, though, we
>>> should still do it.
>>>
>>> To give a little more detail, your version of the code is actually doing
>>> two passes over the data: one to infer the schema and a second for whatever
>>> processing you are asking it to do.  We have to know the schema at each
>>> step of DataFrame construction, so we'd have to do this even before you
>>> called an action.
>>>
>>> Personally, I usually take a small sample of data and use schema
>>> inference on that.  I then hardcode that schema into my program.  This
>>> makes your spark jobs much faster and removes the possibility of the schema
>>> changing underneath the covers.
>>>
>>> Here's some code I use to build the static schema code automatically
>>> 
>>> .
>>>
>>> Would that work for you? If not, why not?
>>>
>>> On Wed, Nov 23, 2016 at 2:48 AM, kant kodali  wrote:
>>>
 Hi Michael,

 Looks like all from_json functions will require me to pass schema and
 that can be little tricky for us but the code below doesn't require me to
 pass schema at all.

 import org.apache.spark.sql._
 val rdd = df2.rdd.map { case Row(j: String) => j }
 spark.read.json(rdd).show()


 On Tue, Nov 22, 2016 at 2:42 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> The first release candidate should be coming out this week. You can
> subscribe to the dev list if you want to follow the release schedule.
>
> On Mon, Nov 21, 2016 at 9:34 PM, kant kodali 
> wrote:
>
>> Hi Michael,
>>
>> I only see spark 2.0.2 which is what I am using currently. Any idea
>> on when 2.1 will be released?
>>
>> Thanks,
>> kant
>>
>> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> In Spark 2.1 we've added a from_json
>>> 
>>> function that I think will do what you want.
>>>
>>> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali 
>>> wrote:
>>>
 This seem to work

 import org.apache.spark.sql._
 val rdd = df2.rdd.map { case Row(j: String) => j }
 spark.read.json(rdd).show()

 However I wonder if this any inefficiency here ? since I have to
 apply this function for billion rows.


>>>
>>
>

>>>
>>
>


Writing DataFrame filter results to separate files

2016-12-05 Thread Everett Anderson
Hi,

I have a DataFrame of records with dates, and I'd like to write all
12-month (with overlap) windows to separate outputs.

Currently, I have a loop equivalent to:

for ((windowStart, windowEnd) <- windows) {
val windowData = allData.filter(
getFilterCriteria(windowStart, windowEnd))
windowData.write.format(...).save(...)
}

This works fine, but has the drawback that since Spark doesn't parallelize
the writes, there is a fairly cost based on the number of windows.

Is there a way around this?

In MapReduce, I'd probably multiply the data in a Mapper with a window ID
and then maybe use something like MultipleOutputs
.
But I'm a bit worried of trying to do this in Spark because of the data
explosion and RAM use. What's the best approach?

Thanks!

- Everett


Re: Kerberos and YARN - functions in spark-shell and spark submit local but not cluster mode

2016-12-05 Thread Marcelo Vanzin
That's not the error, that's just telling you the application failed.
You have to look at the YARN logs for application_1479877553404_0041
to see why it failed.

On Mon, Dec 5, 2016 at 10:44 AM, Gerard Casey  wrote:
> Thanks Marcelo,
>
> My understanding from a few pointers is that this may be due to insufficient 
> read permissions to the key tab or a corrupt key tab. I have checked the read 
> permissions and they are ok. I can see that it is initially configuring 
> correctly:
>
>INFO security.UserGroupInformation: Login successful for user 
> user@login_node using keytab file /path/to/keytab
>
> I’ve added the full trace below.
>
> Gerry
>
> Full trace:
>
> Multiple versions of Spark are installed but SPARK_MAJOR_VERSION is not set
> Spark1 will be picked by default
> 16/12/05 18:23:27 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> 16/12/05 18:23:27 INFO security.UserGroupInformation: Login successful for 
> user me@login_nodeusing keytab file /path/to/keytab
> 16/12/05 18:23:27 INFO yarn.Client: Attempting to login to the Kerberos using 
> principal: me@login_node and keytab: /path/to/keytab
> 16/12/05 18:23:28 INFO impl.TimelineClientImpl: Timeline service address: 
> http://login_node1.xcat.cluster:8188/ws/v1/timeline/
> 16/12/05 18:23:28 INFO client.RMProxy: Connecting to ResourceManager at 
> login_node1.xcat.cluster/
> 16/12/05 18:23:28 INFO client.AHSProxy: Connecting to Application History 
> server at login_node1.xcat.cluster/
> 16/12/05 18:23:28 WARN shortcircuit.DomainSocketFactory: The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 16/12/05 18:23:28 INFO yarn.Client: Requesting a new application from cluster 
> with 32 NodeManagers
> 16/12/05 18:23:28 INFO yarn.Client: Verifying our application has not 
> requested more than the maximum memory capability of the cluster (15360 MB 
> per container)
> 16/12/05 18:23:28 INFO yarn.Client: Will allocate AM container, with 1408 MB 
> memory including 384 MB overhead
> 16/12/05 18:23:28 INFO yarn.Client: Setting up container launch context for 
> our AM
> 16/12/05 18:23:28 INFO yarn.Client: Setting up the launch environment for our 
> AM container
> 16/12/05 18:23:28 INFO yarn.Client: Using the spark assembly jar on HDFS 
> because you are using HDP, 
> defaultSparkAssembly:hdfs://login_node1.xcat.cluster:8020/hdp/apps/2.5.0.0-1245/spark/spark-hdp-assembly.jar
> 16/12/05 18:23:28 INFO yarn.Client: Credentials file set to:
> 16/12/05 18:23:28 INFO yarn.YarnSparkHadoopUtil: getting token for namenode: 
> hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_
> 16/12/05 18:23:28 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 
> 1856 for me on
> 16/12/05 18:23:28 INFO yarn.Client: Renewal Interval set to 8649
> 16/12/05 18:23:28 INFO yarn.Client: Preparing resources for our AM container
> 16/12/05 18:23:28 INFO yarn.YarnSparkHadoopUtil: getting token for namenode: 
> hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_
> 16/12/05 18:23:28 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 
> 1857 for me on
> 16/12/05 18:23:29 INFO yarn.YarnSparkHadoopUtil: HBase class not found 
> java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration
> 16/12/05 18:23:29 INFO yarn.Client: To enable the AM to login from keytab, 
> credentials are being copied over to the AM via the YARN Secure Distributed 
> Cache.
> 16/12/05 18:23:29 INFO yarn.Client: Uploading resource file:/path/to/keytab 
> -> 
> hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_1479877553404_0041/keytab
> 16/12/05 18:23:29 INFO yarn.Client: Using the spark assembly jar on HDFS 
> because you are using HDP, 
> defaultSparkAssembly:hdfs://login_node1.xcat.cluster:8020/hdp/apps/2.5.0.0-1245/spark/spark-hdp-assembly.jar
> 16/12/05 18:23:29 INFO yarn.Client: Source and destination file systems are 
> the same. Not copying 
> hdfs://login_node1.xcat.cluster:8020/hdp/apps/2.5.0.0-1245/spark/spark-hdp-assembly.jar
> 16/12/05 18:23:29 INFO yarn.Client: Uploading resource 
> file:/home/me/Aoife/spark-abm/target/scala-2.10/graphx_sp_2.10-1.0.jar -> 
> hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_1479877553404_0041/graphx_sp_2.10-1.0.jar
> 16/12/05 18:23:29 INFO yarn.Client: Uploading resource 
> file:/tmp/spark-2e566133-d50a-4904-920e-ab5cec07c644/__spark_conf__6538744395325375994.zip
>  -> 
> hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_1479877553404_0041/__spark_conf__6538744395325375994.zip
> 16/12/05 18:23:29 INFO spark.SecurityManager: Changing view acls to: me
> 16/12/05 18:23:29 INFO spark.SecurityManager: Changing modify acls to: me
> 16/12/05 18:23:29 INFO spark.SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(me); users with 

Re: Kerberos and YARN - functions in spark-shell and spark submit local but not cluster mode

2016-12-05 Thread Gerard Casey
Thanks Marcelo,

My understanding from a few pointers is that this may be due to insufficient 
read permissions to the key tab or a corrupt key tab. I have checked the read 
permissions and they are ok. I can see that it is initially configuring 
correctly:

   INFO security.UserGroupInformation: Login successful for user 
user@login_node using keytab file /path/to/keytab

I’ve added the full trace below. 

Gerry

Full trace:

Multiple versions of Spark are installed but SPARK_MAJOR_VERSION is not set
Spark1 will be picked by default
16/12/05 18:23:27 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
16/12/05 18:23:27 INFO security.UserGroupInformation: Login successful for user 
me@login_nodeusing keytab file /path/to/keytab
16/12/05 18:23:27 INFO yarn.Client: Attempting to login to the Kerberos using 
principal: me@login_node and keytab: /path/to/keytab
16/12/05 18:23:28 INFO impl.TimelineClientImpl: Timeline service address: 
http://login_node1.xcat.cluster:8188/ws/v1/timeline/
16/12/05 18:23:28 INFO client.RMProxy: Connecting to ResourceManager at 
login_node1.xcat.cluster/
16/12/05 18:23:28 INFO client.AHSProxy: Connecting to Application History 
server at login_node1.xcat.cluster/
16/12/05 18:23:28 WARN shortcircuit.DomainSocketFactory: The short-circuit 
local reads feature cannot be used because libhadoop cannot be loaded.
16/12/05 18:23:28 INFO yarn.Client: Requesting a new application from cluster 
with 32 NodeManagers
16/12/05 18:23:28 INFO yarn.Client: Verifying our application has not requested 
more than the maximum memory capability of the cluster (15360 MB per container)
16/12/05 18:23:28 INFO yarn.Client: Will allocate AM container, with 1408 MB 
memory including 384 MB overhead
16/12/05 18:23:28 INFO yarn.Client: Setting up container launch context for our 
AM
16/12/05 18:23:28 INFO yarn.Client: Setting up the launch environment for our 
AM container
16/12/05 18:23:28 INFO yarn.Client: Using the spark assembly jar on HDFS 
because you are using HDP, 
defaultSparkAssembly:hdfs://login_node1.xcat.cluster:8020/hdp/apps/2.5.0.0-1245/spark/spark-hdp-assembly.jar
16/12/05 18:23:28 INFO yarn.Client: Credentials file set to:
16/12/05 18:23:28 INFO yarn.YarnSparkHadoopUtil: getting token for namenode: 
hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_
16/12/05 18:23:28 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 1856 
for me on
16/12/05 18:23:28 INFO yarn.Client: Renewal Interval set to 8649
16/12/05 18:23:28 INFO yarn.Client: Preparing resources for our AM container
16/12/05 18:23:28 INFO yarn.YarnSparkHadoopUtil: getting token for namenode: 
hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_
16/12/05 18:23:28 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 1857 
for me on 
16/12/05 18:23:29 INFO yarn.YarnSparkHadoopUtil: HBase class not found 
java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration
16/12/05 18:23:29 INFO yarn.Client: To enable the AM to login from keytab, 
credentials are being copied over to the AM via the YARN Secure Distributed 
Cache.
16/12/05 18:23:29 INFO yarn.Client: Uploading resource file:/path/to/keytab -> 
hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_1479877553404_0041/keytab
16/12/05 18:23:29 INFO yarn.Client: Using the spark assembly jar on HDFS 
because you are using HDP, 
defaultSparkAssembly:hdfs://login_node1.xcat.cluster:8020/hdp/apps/2.5.0.0-1245/spark/spark-hdp-assembly.jar
16/12/05 18:23:29 INFO yarn.Client: Source and destination file systems are the 
same. Not copying 
hdfs://login_node1.xcat.cluster:8020/hdp/apps/2.5.0.0-1245/spark/spark-hdp-assembly.jar
16/12/05 18:23:29 INFO yarn.Client: Uploading resource 
file:/home/me/Aoife/spark-abm/target/scala-2.10/graphx_sp_2.10-1.0.jar -> 
hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_1479877553404_0041/graphx_sp_2.10-1.0.jar
16/12/05 18:23:29 INFO yarn.Client: Uploading resource 
file:/tmp/spark-2e566133-d50a-4904-920e-ab5cec07c644/__spark_conf__6538744395325375994.zip
 -> 
hdfs://login_node1.xcat.cluster:8020/user/me/.sparkStaging/application_1479877553404_0041/__spark_conf__6538744395325375994.zip
16/12/05 18:23:29 INFO spark.SecurityManager: Changing view acls to: me
16/12/05 18:23:29 INFO spark.SecurityManager: Changing modify acls to: me
16/12/05 18:23:29 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(me); users with 
modify permissions: Set(me)
16/12/05 18:23:29 INFO yarn.Client: Submitting application 41 to ResourceManager
16/12/05 18:23:30 INFO impl.YarnClientImpl: Submitted application 
application_1479877553404_0041
16/12/05 18:23:31 INFO yarn.Client: Application report for 
application_1479877553404_0041 (state: ACCEPTED)
16/12/05 18:23:31 INFO yarn.Client:
 client token: Token { kind: 

Re: Kerberos and YARN - functions in spark-shell and spark submit local but not cluster mode

2016-12-05 Thread Marcelo Vanzin
There's generally an exception in these cases, and you haven't posted
it, so it's hard to tell you what's wrong. The most probable cause,
without the extra information the exception provides, is that you're
using the wrong Hadoop configuration when submitting the job to YARN.

On Mon, Dec 5, 2016 at 4:35 AM, Gerard Casey  wrote:
> Hello all,
>
> I am using Spark with Kerberos authentication.
>
> I can run my code using `spark-shell` fine and I can also use `spark-submit`
> in local mode (e.g. —master local[16]). Both function as expected.
>
> local mode -
>
> spark-submit --class "graphx_sp" --master local[16] --driver-memory 20G
> target/scala-2.10/graphx_sp_2.10-1.0.jar
>
> I am now progressing to run in cluster mode using YARN.
>
> cluster mode with YARN -
>
> spark-submit --class "graphx_sp" --master yarn --deploy-mode cluster
> --executor-memory 13G --total-executor-cores 32
> target/scala-2.10/graphx_sp_2.10-1.0.jar
>
> However, this returns:
>
> diagnostics: User class threw exception:
> org.apache.hadoop.security.AccessControlException: Authentication required
>
> Before I run using spark-shell or on local mode in spark-submit I do the
> following kerberos setup:
>
> kinit -k -t ~/keytab -r 7d `whoami`
>
> Clearly, this setup is not extending to the YARN setup. How do I fix the
> Kerberos issue with YARN in cluster mode? Is this something which must be in
> my /src/main/scala/graphx_sp.scala file?
>
> Many thanks
>
> Geroid



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kerberos and YARN - functions in spark-shell and spark submit local but not cluster mode

2016-12-05 Thread Jorge Sánchez
Hi Gerard,

have you tried running in yarn-client mode? If so, do you still get that
same error?

Regards.

2016-12-05 12:49 GMT+00:00 Gerard Casey :

> Edit. From here
> 
>  I
> read that you can pass a `key tab` option to spark-submit. I thus tried
>
> *spark-submit --class "graphx_sp" --master yarn  *--keytab
> /path/to/keytab  *--deploy-mode cluster --executor-memory 13G
> --total-executor-cores 32 target/scala-2.10/graphx_sp_2.10-1.0.jar*
>
> However, the error persists
>
> Any ideas?
>
> Thanks
>
> Geroid
>
> On 5 Dec 2016, at 13:35, Gerard Casey  wrote:
>
> Hello all,
>
> I am using Spark with Kerberos authentication.
>
> I can run my code using `spark-shell` fine and I can also use
> `spark-submit` in local mode (e.g. —master local[16]). Both function as
> expected.
>
> local mode -
>
> *spark-submit --class "graphx_sp" --master local[16] --driver-memory 20G
> target/scala-2.10/graphx_sp_2.10-1.0.jar*
>
> I am now progressing to run in cluster mode using YARN.
>
> cluster mode with YARN -
>
> *spark-submit --class "graphx_sp" --master yarn --deploy-mode cluster
> --executor-memory 13G --total-executor-cores 32
> target/scala-2.10/graphx_sp_2.10-1.0.jar*
>
> However, this returns:
>
> *diagnostics: User class threw exception:
> org.apache.hadoop.security.AccessControlException: Authentication required*
>
> Before I run using spark-shell or on local mode in spark-submit I do the
> following kerberos setup:
>
> kinit -k -t ~/keytab -r 7d `whoami`
>
> Clearly, this setup is not extending to the YARN setup. How do I fix the
> Kerberos issue with YARN in cluster mode? Is this something which must be
> in my /src/main/scala/graphx_sp.scala file?
>
> Many thanks
>
> Geroid
>
>
>


Re: Would spark dataframe/rdd read from external source on every action?

2016-12-05 Thread neil90
Yes it would.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Would-spark-dataframe-rdd-read-from-external-source-on-every-action-tp28157p28158.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



streaming deployment on yarn -emr

2016-12-05 Thread Saurabh Malviya (samalviy)
Hi,

We are using EMR and using oozie right now to deploy streaming job  (Workflow). 
I just want to know best practice to deploy streaming job. (In mesos we deploy 
using marathon, but what should be best approach in yarn which enforce only 
once instance and restart if it fails for any reason)

I noticed sometime not sure yarn launched two instance of streaming job, It 
seems to me if driver is unhealthy then its try to second attempt and in the 
meantime old driver recovers (That why need to check how we enforced only one 
instance of streaming is running on yarn)

-Saurabh




How could one specify a Docker image for each job to be used by executors?

2016-12-05 Thread Enno Shioji
Hi,

Suppose I have a job that uses some native libraries. I can launch
executors using a Docker container and everything is fine.

Now suppose I have some other job that uses some other native libraries
(and let's assume they just can't co-exist in the same docker image), but I
want to execute those on the same cluster.

Is there a way to have executors run in a specified environment (e.g.
docker container), on per-job basis? E.g. such that some executors for one
job are launched with some docker image and other executors belonging to
other jobs are using a different docker image?


/Enno


Re: Livy with Spark

2016-12-05 Thread Mich Talebzadeh
Thanks Richard for the link. Also its interaction with Zeppelin is great.

I believe it is a very early stage for now

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 5 December 2016 at 14:50, Richard Startin 
wrote:

> There is a great write up on Livy at
> http://henning.kropponline.de/2016/11/06/
>
> On 5 Dec 2016, at 14:34, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> Has there been any experience using Livy with Spark to share multiple
> Spark contexts?
>
> thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Spark streaming completed batches statistics

2016-12-05 Thread Richard Startin
Is there any way to get a more computer friendly version of the completes 
batches section of the streaming page of the application master? I am very 
interested in the statistics and am currently screen-scraping... 

https://richardstartin.com
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Livy with Spark

2016-12-05 Thread Richard Startin
There is a great write up on Livy at
http://henning.kropponline.de/2016/11/06/

On 5 Dec 2016, at 14:34, Mich Talebzadeh 
> wrote:

Hi,

Has there been any experience using Livy with Spark to share multiple Spark 
contexts?

thanks



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




Livy with Spark

2016-12-05 Thread Mich Talebzadeh
Hi,

Has there been any experience using Livy with Spark to share multiple Spark
contexts?

thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Kerberos and YARN - functions in spark-shell and spark submit local but not cluster mode

2016-12-05 Thread Gerard Casey
Edit. From here 

 I read that you can pass a `key tab` option to spark-submit. I thus tried

spark-submit --class "graphx_sp" --master yarn  --keytab /path/to/keytab  
--deploy-mode cluster --executor-memory 13G --total-executor-cores 32 
target/scala-2.10/graphx_sp_2.10-1.0.jar

However, the error persists

Any ideas?

Thanks

Geroid 

> On 5 Dec 2016, at 13:35, Gerard Casey  wrote:
> 
> Hello all,
> 
> I am using Spark with Kerberos authentication.
> 
> I can run my code using `spark-shell` fine and I can also use `spark-submit` 
> in local mode (e.g. —master local[16]). Both function as expected.
> 
> local mode -
> 
>   spark-submit --class "graphx_sp" --master local[16] --driver-memory 20G 
> target/scala-2.10/graphx_sp_2.10-1.0.jar
> 
> I am now progressing to run in cluster mode using YARN.
> 
> cluster mode with YARN - 
> 
>   spark-submit --class "graphx_sp" --master yarn --deploy-mode cluster 
> --executor-memory 13G --total-executor-cores 32 
> target/scala-2.10/graphx_sp_2.10-1.0.jar
> 
> However, this returns:
> 
>   diagnostics: User class threw exception: 
> org.apache.hadoop.security.AccessControlException: Authentication required
> 
> Before I run using spark-shell or on local mode in spark-submit I do the 
> following kerberos setup:
> 
>   kinit -k -t ~/keytab -r 7d `whoami`
> 
> Clearly, this setup is not extending to the YARN setup. How do I fix the 
> Kerberos issue with YARN in cluster mode? Is this something which must be in 
> my /src/main/scala/graphx_sp.scala file? 
> 
> Many thanks
> 
> Geroid 



Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-12-05 Thread Hyukjin Kwon
Hi Kant,

How about doing something like this?

import org.apache.spark.sql.functions._

// val df2 = df.select(df("body").cast(StringType).as("body"))
val df2 = Seq("""{"a": 1}""").toDF("body")
val schema = spark.read.json(df2.as[String].rdd).schema
df2.select(from_json(col("body"), schema)).show()

​

2016-12-05 19:51 GMT+09:00 kant kodali :

> Hi Michael,
>
> " Personally, I usually take a small sample of data and use schema
> inference on that.  I then hardcode that schema into my program.  This
> makes your spark jobs much faster and removes the possibility of the schema
> changing underneath the covers."
>
> This may or may not work for us. Not all rows have the same schema. The
> number of distinct schemas we have now may be small but going forward this
> can go to any number moreover a distinct call can lead to a table scan
> which can be billions of rows for us.
>
> I also would agree to keep the API consistent than making an exception
> however I wonder if it make sense to provide an action call to infer the
> schema which would return a new dataframe after the action call finishes
> (after schema inference)? For example, something like below ?
>
> val inferedDF = df.inferSchema(col1);
>
> Thanks,
>
>
>
>
> On Mon, Nov 28, 2016 at 6:12 PM, Michael Armbrust 
> wrote:
>
>> You could open up a JIRA to add a version of from_json that supports
>> schema inference, but unfortunately that would not be super easy to
>> implement.  In particular, it would introduce a weird case where only this
>> specific function would block for a long time while we infer the schema
>> (instead of waiting for an action).  This blocking would be kind of odd for
>> a call like df.select(...).  If there is enough interest, though, we
>> should still do it.
>>
>> To give a little more detail, your version of the code is actually doing
>> two passes over the data: one to infer the schema and a second for whatever
>> processing you are asking it to do.  We have to know the schema at each
>> step of DataFrame construction, so we'd have to do this even before you
>> called an action.
>>
>> Personally, I usually take a small sample of data and use schema
>> inference on that.  I then hardcode that schema into my program.  This
>> makes your spark jobs much faster and removes the possibility of the schema
>> changing underneath the covers.
>>
>> Here's some code I use to build the static schema code automatically
>> 
>> .
>>
>> Would that work for you? If not, why not?
>>
>> On Wed, Nov 23, 2016 at 2:48 AM, kant kodali  wrote:
>>
>>> Hi Michael,
>>>
>>> Looks like all from_json functions will require me to pass schema and
>>> that can be little tricky for us but the code below doesn't require me to
>>> pass schema at all.
>>>
>>> import org.apache.spark.sql._
>>> val rdd = df2.rdd.map { case Row(j: String) => j }
>>> spark.read.json(rdd).show()
>>>
>>>
>>> On Tue, Nov 22, 2016 at 2:42 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 The first release candidate should be coming out this week. You can
 subscribe to the dev list if you want to follow the release schedule.

 On Mon, Nov 21, 2016 at 9:34 PM, kant kodali 
 wrote:

> Hi Michael,
>
> I only see spark 2.0.2 which is what I am using currently. Any idea on
> when 2.1 will be released?
>
> Thanks,
> kant
>
> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> In Spark 2.1 we've added a from_json
>> 
>> function that I think will do what you want.
>>
>> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali 
>> wrote:
>>
>>> This seem to work
>>>
>>> import org.apache.spark.sql._
>>> val rdd = df2.rdd.map { case Row(j: String) => j }
>>> spark.read.json(rdd).show()
>>>
>>> However I wonder if this any inefficiency here ? since I have to
>>> apply this function for billion rows.
>>>
>>>
>>
>

>>>
>>
>


Kerberos and YARN - functions in spark-shell and spark submit local but not cluster mode

2016-12-05 Thread Gerard Casey
Hello all,

I am using Spark with Kerberos authentication.

I can run my code using `spark-shell` fine and I can also use `spark-submit` in 
local mode (e.g. —master local[16]). Both function as expected.

local mode -

spark-submit --class "graphx_sp" --master local[16] --driver-memory 20G 
target/scala-2.10/graphx_sp_2.10-1.0.jar

I am now progressing to run in cluster mode using YARN.

cluster mode with YARN - 

spark-submit --class "graphx_sp" --master yarn --deploy-mode cluster 
--executor-memory 13G --total-executor-cores 32 
target/scala-2.10/graphx_sp_2.10-1.0.jar

However, this returns:

diagnostics: User class threw exception: 
org.apache.hadoop.security.AccessControlException: Authentication required

Before I run using spark-shell or on local mode in spark-submit I do the 
following kerberos setup:

kinit -k -t ~/keytab -r 7d `whoami`

Clearly, this setup is not extending to the YARN setup. How do I fix the 
Kerberos issue with YARN in cluster mode? Is this something which must be in my 
/src/main/scala/graphx_sp.scala file? 

Many thanks

Geroid 

Re: Access multiple cluster

2016-12-05 Thread Steve Loughran
if the remote filesystem is visible from the other, than a different HDFS 
value, e.g hdfs://analytics:8000/historical/  can be used for reads & writes, 
even if your defaultFS (the one where you get max performance) is, say 
hdfs://processing:8000/

-performance will be slower, in both directions
-if you have a fast pipe between the two clusters, then a job with many 
executors may unintentionally saturate the network, leading to unhappy people 
elsewhere.
-you'd better have mutual trust at the kerberos layer. There's a configuration 
option (I forget its name) to give spark-submit a list of hdfs namenodes it 
will need to get tokens from. Unless your spark cluster is being launched with 
keytabs, you will need to list upfront all hdfs clusters your job intends to 
work with

On 4 Dec 2016, at 21:45, ayan guha 
> wrote:


Hi

Is it possible to access hive tables sitting on multiple clusters in a single 
spark application?

We have a data processing cluster and analytics cluster. I want to join a table 
from analytics cluster with another table in processing cluster and finally 
write back in analytics cluster.

Best
Ayan



Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-12-05 Thread kant kodali
Hi Michael,

" Personally, I usually take a small sample of data and use schema
inference on that.  I then hardcode that schema into my program.  This
makes your spark jobs much faster and removes the possibility of the schema
changing underneath the covers."

This may or may not work for us. Not all rows have the same schema. The
number of distinct schemas we have now may be small but going forward this
can go to any number moreover a distinct call can lead to a table scan
which can be billions of rows for us.

I also would agree to keep the API consistent than making an exception
however I wonder if it make sense to provide an action call to infer the
schema which would return a new dataframe after the action call finishes
(after schema inference)? For example, something like below ?

val inferedDF = df.inferSchema(col1);

Thanks,




On Mon, Nov 28, 2016 at 6:12 PM, Michael Armbrust 
wrote:

> You could open up a JIRA to add a version of from_json that supports
> schema inference, but unfortunately that would not be super easy to
> implement.  In particular, it would introduce a weird case where only this
> specific function would block for a long time while we infer the schema
> (instead of waiting for an action).  This blocking would be kind of odd for
> a call like df.select(...).  If there is enough interest, though, we
> should still do it.
>
> To give a little more detail, your version of the code is actually doing
> two passes over the data: one to infer the schema and a second for whatever
> processing you are asking it to do.  We have to know the schema at each
> step of DataFrame construction, so we'd have to do this even before you
> called an action.
>
> Personally, I usually take a small sample of data and use schema inference
> on that.  I then hardcode that schema into my program.  This makes your
> spark jobs much faster and removes the possibility of the schema changing
> underneath the covers.
>
> Here's some code I use to build the static schema code automatically
> 
> .
>
> Would that work for you? If not, why not?
>
> On Wed, Nov 23, 2016 at 2:48 AM, kant kodali  wrote:
>
>> Hi Michael,
>>
>> Looks like all from_json functions will require me to pass schema and
>> that can be little tricky for us but the code below doesn't require me to
>> pass schema at all.
>>
>> import org.apache.spark.sql._
>> val rdd = df2.rdd.map { case Row(j: String) => j }
>> spark.read.json(rdd).show()
>>
>>
>> On Tue, Nov 22, 2016 at 2:42 PM, Michael Armbrust > > wrote:
>>
>>> The first release candidate should be coming out this week. You can
>>> subscribe to the dev list if you want to follow the release schedule.
>>>
>>> On Mon, Nov 21, 2016 at 9:34 PM, kant kodali  wrote:
>>>
 Hi Michael,

 I only see spark 2.0.2 which is what I am using currently. Any idea on
 when 2.1 will be released?

 Thanks,
 kant

 On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> In Spark 2.1 we've added a from_json
> 
> function that I think will do what you want.
>
> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali 
> wrote:
>
>> This seem to work
>>
>> import org.apache.spark.sql._
>> val rdd = df2.rdd.map { case Row(j: String) => j }
>> spark.read.json(rdd).show()
>>
>> However I wonder if this any inefficiency here ? since I have to
>> apply this function for billion rows.
>>
>>
>

>>>
>>
>


Re: Can spark support exactly once based kafka ? Due to these following question?

2016-12-05 Thread Jörn Franke
You need to do the book keeping of what has been processed yourself. This may 
mean roughly the following (of course the devil is in the details):
Write down in zookeeper which part of the processing job has been done and for 
which dataset all the data has been created (do not keep the data itself in 
zookeeper).
Once you start a processing job, check in zookeeper if it has been processed, 
if not remove all staging data, if yes terminate. 

As I said the details depend on your job and require some careful thinking, but 
exactly once can be achieved with Spark (and potentially zookeeper or similar, 
such as Redis).
Of course at the same time think if you need delivery in order etc.

> On 5 Dec 2016, at 08:59, Michal Šenkýř  wrote:
> 
> Hello John,
> 
>> 1. If a task complete the operation, it will notify driver.   
>> The driver may not receive the message due to the network, and think the 
>> task is still running. Then the child stage won't be scheduled ?
> Spark's fault tolerance policy is, if there is a problem in processing a task 
> or an executor is lost, run the task (and any dependent tasks) again. Spark 
> attempts to minimize the number of tasks it has to recompute, so usually only 
> a small part of the data is recomputed.
> 
> So in your case, the driver simply schedules the task on another executor and 
> continues to the next stage when it receives the data.
>> 2. how do spark guarantee the downstream-task can receive the shuffle-data 
>> completely. As fact, I can't find the checksum for blocks in spark. For 
>> example, the upstream-task may shuffle 100Mb data, but the downstream-task 
>> may receive 99Mb data due to network. Can spark verify the data is received 
>> completely based size ?
> Spark uses compression with checksuming for shuffle data so it should know 
> when the data is corrupt and initiate a recomputation.
> 
> As for your question in the subject:
> All of this means that Spark supports at-least-once processing. There is no 
> way that I know of to ensure exactly-once. You can try to minimize 
> more-than-once situations by updating your offsets as soon as possible but 
> that does not eliminate the problem entirely.
> 
> Hope this helps,
> Michal Senkyr


Re: Can spark support exactly once based kafka ? Due to these following question?

2016-12-05 Thread Piotr Smoliński
The boundary is a bit flexible. In terms of observed DStream effective
state the direct stream semantics is exactly-once.
In terms of external system observations (like message emission), Spark
Streaming semantics is at-least-once.

Regards,
Piotr

On Mon, Dec 5, 2016 at 8:59 AM, Michal Šenkýř  wrote:

> Hello John,
>
> 1. If a task complete the operation, it will notify driver. The driver may
> not receive the message due to the network, and think the task is still
> running. Then the child stage won't be scheduled ?
>
> Spark's fault tolerance policy is, if there is a problem in processing a
> task or an executor is lost, run the task (and any dependent tasks) again.
> Spark attempts to minimize the number of tasks it has to recompute, so
> usually only a small part of the data is recomputed.
>
> So in your case, the driver simply schedules the task on another executor
> and continues to the next stage when it receives the data.
>
> 2. how do spark guarantee the downstream-task can receive the shuffle-data
> completely. As fact, I can't find the checksum for blocks in spark. For
> example, the upstream-task may shuffle 100Mb data, but the downstream-task
> may receive 99Mb data due to network. Can spark verify the data is received
> completely based size ?
>
> Spark uses compression with checksuming for shuffle data so it should know
> when the data is corrupt and initiate a recomputation.
>
> As for your question in the subject:
> All of this means that Spark supports at-least-once processing. There is
> no way that I know of to ensure exactly-once. You can try to minimize
> more-than-once situations by updating your offsets as soon as possible but
> that does not eliminate the problem entirely.
>
> Hope this helps,
>
> Michal Senkyr
>


Re: How to convert a unix timestamp column into date format(yyyy-MM-dd) ?

2016-12-05 Thread Devi P.V
Thanks.It works.

On Mon, Dec 5, 2016 at 2:03 PM, Michal Šenkýř  wrote:

> Yet another approach:
> scala> val df1 = df.selectExpr("client_id", 
> "from_unixtime(ts/1000,'-MM-dd')
> as ts")
>
> Mgr. Michal Šenkýřmike.sen...@gmail.com
> +420 605 071 818
>
> On 5.12.2016 09:22, Deepak Sharma wrote:
>
> Another simpler approach will be:
> scala> val findf = sqlContext.sql("select 
> client_id,from_unixtime(ts/1000,'-MM-dd')
> ts from ts")
> findf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string]
>
> scala> findf.show
> ++--+
> |   client_id|ts|
> ++--+
> |cd646551-fceb-416...|2016-11-01|
> |3bc61951-0f49-43b...|2016-11-01|
> |688acc61-753f-4a3...|2016-11-23|
> |5ff1eb6c-14ec-471...|2016-11-23|
> ++--+
>
> I registered temp table out of the original DF
> Thanks
> Deepak
>
> On Mon, Dec 5, 2016 at 1:49 PM, Deepak Sharma 
> wrote:
>
>> This is the correct way to do it.The timestamp that you mentioned was not
>> correct:
>>
>> scala> val ts1 = from_unixtime($"ts"/1000, "-MM-dd")
>> ts1: org.apache.spark.sql.Column = fromunixtime((ts / 1000),-MM-dd)
>>
>> scala> val finaldf = df.withColumn("ts1",ts1)
>> finaldf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string,
>> ts1: string]
>>
>> scala> finaldf.show
>> ++-+--+
>> |   client_id|   ts|   ts1|
>> ++-+--+
>> |cd646551-fceb-416...|1477989416803|2016-11-01|
>> |3bc61951-0f49-43b...|1477983725292|2016-11-01|
>> |688acc61-753f-4a3...|1479899459947|2016-11-23|
>> |5ff1eb6c-14ec-471...|1479901374026|2016-11-23|
>> ++-+--+
>>
>>
>> Thanks
>> Deepak
>>
>> On Mon, Dec 5, 2016 at 1:46 PM, Deepak Sharma 
>> wrote:
>>
>>> This is how you can do it in scala:
>>> scala> val ts1 = from_unixtime($"ts", "-MM-dd")
>>> ts1: org.apache.spark.sql.Column = fromunixtime(ts,-MM-dd)
>>>
>>> scala> val finaldf = df.withColumn("ts1",ts1)
>>> finaldf: org.apache.spark.sql.DataFrame = [client_id: string, ts:
>>> string, ts1: string]
>>>
>>> scala> finaldf.show
>>> ++-+---+
>>> |   client_id|   ts|ts1|
>>> ++-+---+
>>> |cd646551-fceb-416...|1477989416803|48805-08-14|
>>> |3bc61951-0f49-43b...|1477983725292|48805-06-09|
>>> |688acc61-753f-4a3...|1479899459947|48866-02-22|
>>> |5ff1eb6c-14ec-471...|1479901374026|48866-03-16|
>>> ++-+---+
>>>
>>> The year is returning wrong here.May be the input timestamp is not
>>> correct .Not sure.
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Mon, Dec 5, 2016 at 1:34 PM, Devi P.V  wrote:
>>>
 Hi,

 Thanks for replying to my question.
 I am using scala

 On Mon, Dec 5, 2016 at 1:20 PM, Marco Mistroni 
 wrote:

> Hi
>  In python you can use date time.fromtimestamp(..).str
> ftime('%Y%m%d')
> Which spark API are you using?
> Kr
>
> On 5 Dec 2016 7:38 am, "Devi P.V"  wrote:
>
>> Hi all,
>>
>> I have a dataframe like following,
>>
>> ++---+
>> |client_id   |timestamp|
>> ++---+
>> |cd646551-fceb-4166-acbc-b9|1477989416803  |
>> |3bc61951-0f49-43bf-9848-b2|1477983725292  |
>> |688acc61-753f-4a33-a034-bc|1479899459947  |
>> |5ff1eb6c-14ec-4716-9798-00|1479901374026  |
>> ++---+
>>
>>  I want to convert timestamp column into -MM-dd format.
>> How to do this?
>>
>>
>> Thanks
>>
>

>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>
>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>


Re: How to convert a unix timestamp column into date format(yyyy-MM-dd) ?

2016-12-05 Thread Michal Šenkýř

Yet another approach:

scala> val df1 = df.selectExpr("client_id", 
"from_unixtime(ts/1000,'-MM-dd') as ts")


Mgr. Michal Šenkýř
mike.sen...@gmail.com
+420 605 071 818

On 5.12.2016 09:22, Deepak Sharma wrote:

Another simpler approach will be:
scala> val findf = sqlContext.sql("select 
client_id,from_unixtime(ts/1000,'-MM-dd') ts from ts")

findf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string]

scala> findf.show
++--+
|   client_id|ts|
++--+
|cd646551-fceb-416...|2016-11-01|
|3bc61951-0f49-43b...|2016-11-01|
|688acc61-753f-4a3...|2016-11-23|
|5ff1eb6c-14ec-471...|2016-11-23|
++--+

I registered temp table out of the original DF
Thanks
Deepak

On Mon, Dec 5, 2016 at 1:49 PM, Deepak Sharma > wrote:


This is the correct way to do it.The timestamp that you mentioned
was not correct:

scala> val ts1 = from_unixtime($"ts"/1000, "-MM-dd")
ts1: org.apache.spark.sql.Column = fromunixtime((ts /
1000),-MM-dd)

scala> val finaldf = df.withColumn("ts1",ts1)
finaldf: org.apache.spark.sql.DataFrame = [client_id: string, ts:
string, ts1: string]

scala> finaldf.show
++-+--+
|   client_id|   ts|   ts1|
++-+--+
|cd646551-fceb-416...|1477989416803|2016-11-01|
|3bc61951-0f49-43b...|1477983725292|2016-11-01|
|688acc61-753f-4a3...|1479899459947|2016-11-23|
|5ff1eb6c-14ec-471...|1479901374026|2016-11-23|
++-+--+


Thanks
Deepak

On Mon, Dec 5, 2016 at 1:46 PM, Deepak Sharma
> wrote:

This is how you can do it in scala:
scala> val ts1 = from_unixtime($"ts", "-MM-dd")
ts1: org.apache.spark.sql.Column = fromunixtime(ts,-MM-dd)

scala> val finaldf = df.withColumn("ts1",ts1)
finaldf: org.apache.spark.sql.DataFrame = [client_id: string,
ts: string, ts1: string]

scala> finaldf.show
++-+---+
|   client_id|   ts|ts1|
++-+---+
|cd646551-fceb-416...|1477989416803|48805-08-14|
|3bc61951-0f49-43b...|1477983725292|48805-06-09|
|688acc61-753f-4a3...|1479899459947|48866-02-22|
|5ff1eb6c-14ec-471...|1479901374026|48866-03-16|
++-+---+

The year is returning wrong here.May be the input timestamp is
not correct .Not sure.

Thanks
Deepak

On Mon, Dec 5, 2016 at 1:34 PM, Devi P.V > wrote:

Hi,

Thanks for replying to my question.
I am using scala

On Mon, Dec 5, 2016 at 1:20 PM, Marco Mistroni
> wrote:

Hi
 In python you can use date
time.fromtimestamp(..).strftime('%Y%m%d')
Which spark API are you using?
Kr

On 5 Dec 2016 7:38 am, "Devi P.V" > wrote:

Hi all,

I have a dataframe like following,

++---+
|client_id|timestamp|
++---+
|cd646551-fceb-4166-acbc-b9|1477989416803  |
|3bc61951-0f49-43bf-9848-b2|1477983725292  |
|688acc61-753f-4a33-a034-bc|1479899459947  |
|5ff1eb6c-14ec-4716-9798-00|1479901374026  |
++---+

 I want to convert timestamp column into
-MM-dd format.
How to do this?


Thanks





-- 
Thanks

Deepak
www.bigdatabig.com 
www.keosha.net 




-- 
Thanks

Deepak
www.bigdatabig.com 
www.keosha.net 




--
Thanks
Deepak
www.bigdatabig.com 
www.keosha.net 




Re: How to convert a unix timestamp column into date format(yyyy-MM-dd) ?

2016-12-05 Thread Deepak Sharma
Another simpler approach will be:
scala> val findf = sqlContext.sql("select
client_id,from_unixtime(ts/1000,'-MM-dd') ts from ts")
findf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string]

scala> findf.show
++--+
|   client_id|ts|
++--+
|cd646551-fceb-416...|2016-11-01|
|3bc61951-0f49-43b...|2016-11-01|
|688acc61-753f-4a3...|2016-11-23|
|5ff1eb6c-14ec-471...|2016-11-23|
++--+

I registered temp table out of the original DF
Thanks
Deepak

On Mon, Dec 5, 2016 at 1:49 PM, Deepak Sharma  wrote:

> This is the correct way to do it.The timestamp that you mentioned was not
> correct:
>
> scala> val ts1 = from_unixtime($"ts"/1000, "-MM-dd")
> ts1: org.apache.spark.sql.Column = fromunixtime((ts / 1000),-MM-dd)
>
> scala> val finaldf = df.withColumn("ts1",ts1)
> finaldf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string,
> ts1: string]
>
> scala> finaldf.show
> ++-+--+
> |   client_id|   ts|   ts1|
> ++-+--+
> |cd646551-fceb-416...|1477989416803|2016-11-01|
> |3bc61951-0f49-43b...|1477983725292|2016-11-01|
> |688acc61-753f-4a3...|1479899459947|2016-11-23|
> |5ff1eb6c-14ec-471...|1479901374026|2016-11-23|
> ++-+--+
>
>
> Thanks
> Deepak
>
> On Mon, Dec 5, 2016 at 1:46 PM, Deepak Sharma 
> wrote:
>
>> This is how you can do it in scala:
>> scala> val ts1 = from_unixtime($"ts", "-MM-dd")
>> ts1: org.apache.spark.sql.Column = fromunixtime(ts,-MM-dd)
>>
>> scala> val finaldf = df.withColumn("ts1",ts1)
>> finaldf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string,
>> ts1: string]
>>
>> scala> finaldf.show
>> ++-+---+
>> |   client_id|   ts|ts1|
>> ++-+---+
>> |cd646551-fceb-416...|1477989416803|48805-08-14|
>> |3bc61951-0f49-43b...|1477983725292|48805-06-09|
>> |688acc61-753f-4a3...|1479899459947|48866-02-22|
>> |5ff1eb6c-14ec-471...|1479901374026|48866-03-16|
>> ++-+---+
>>
>> The year is returning wrong here.May be the input timestamp is not
>> correct .Not sure.
>>
>> Thanks
>> Deepak
>>
>> On Mon, Dec 5, 2016 at 1:34 PM, Devi P.V  wrote:
>>
>>> Hi,
>>>
>>> Thanks for replying to my question.
>>> I am using scala
>>>
>>> On Mon, Dec 5, 2016 at 1:20 PM, Marco Mistroni 
>>> wrote:
>>>
 Hi
  In python you can use date time.fromtimestamp(..).str
 ftime('%Y%m%d')
 Which spark API are you using?
 Kr

 On 5 Dec 2016 7:38 am, "Devi P.V"  wrote:

> Hi all,
>
> I have a dataframe like following,
>
> ++---+
> |client_id   |timestamp|
> ++---+
> |cd646551-fceb-4166-acbc-b9|1477989416803  |
> |3bc61951-0f49-43bf-9848-b2|1477983725292  |
> |688acc61-753f-4a33-a034-bc|1479899459947  |
> |5ff1eb6c-14ec-4716-9798-00|1479901374026  |
> ++---+
>
>  I want to convert timestamp column into -MM-dd format.
> How to do this?
>
>
> Thanks
>

>>>
>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: How to convert a unix timestamp column into date format(yyyy-MM-dd) ?

2016-12-05 Thread Deepak Sharma
This is the correct way to do it.The timestamp that you mentioned was not
correct:

scala> val ts1 = from_unixtime($"ts"/1000, "-MM-dd")
ts1: org.apache.spark.sql.Column = fromunixtime((ts / 1000),-MM-dd)

scala> val finaldf = df.withColumn("ts1",ts1)
finaldf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string,
ts1: string]

scala> finaldf.show
++-+--+
|   client_id|   ts|   ts1|
++-+--+
|cd646551-fceb-416...|1477989416803|2016-11-01|
|3bc61951-0f49-43b...|1477983725292|2016-11-01|
|688acc61-753f-4a3...|1479899459947|2016-11-23|
|5ff1eb6c-14ec-471...|1479901374026|2016-11-23|
++-+--+


Thanks
Deepak

On Mon, Dec 5, 2016 at 1:46 PM, Deepak Sharma  wrote:

> This is how you can do it in scala:
> scala> val ts1 = from_unixtime($"ts", "-MM-dd")
> ts1: org.apache.spark.sql.Column = fromunixtime(ts,-MM-dd)
>
> scala> val finaldf = df.withColumn("ts1",ts1)
> finaldf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string,
> ts1: string]
>
> scala> finaldf.show
> ++-+---+
> |   client_id|   ts|ts1|
> ++-+---+
> |cd646551-fceb-416...|1477989416803|48805-08-14|
> |3bc61951-0f49-43b...|1477983725292|48805-06-09|
> |688acc61-753f-4a3...|1479899459947|48866-02-22|
> |5ff1eb6c-14ec-471...|1479901374026|48866-03-16|
> ++-+---+
>
> The year is returning wrong here.May be the input timestamp is not correct
> .Not sure.
>
> Thanks
> Deepak
>
> On Mon, Dec 5, 2016 at 1:34 PM, Devi P.V  wrote:
>
>> Hi,
>>
>> Thanks for replying to my question.
>> I am using scala
>>
>> On Mon, Dec 5, 2016 at 1:20 PM, Marco Mistroni 
>> wrote:
>>
>>> Hi
>>>  In python you can use date time.fromtimestamp(..).str
>>> ftime('%Y%m%d')
>>> Which spark API are you using?
>>> Kr
>>>
>>> On 5 Dec 2016 7:38 am, "Devi P.V"  wrote:
>>>
 Hi all,

 I have a dataframe like following,

 ++---+
 |client_id   |timestamp|
 ++---+
 |cd646551-fceb-4166-acbc-b9|1477989416803  |
 |3bc61951-0f49-43bf-9848-b2|1477983725292  |
 |688acc61-753f-4a33-a034-bc|1479899459947  |
 |5ff1eb6c-14ec-4716-9798-00|1479901374026  |
 ++---+

  I want to convert timestamp column into -MM-dd format.
 How to do this?


 Thanks

>>>
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: How to convert a unix timestamp column into date format(yyyy-MM-dd) ?

2016-12-05 Thread Deepak Sharma
This is how you can do it in scala:
scala> val ts1 = from_unixtime($"ts", "-MM-dd")
ts1: org.apache.spark.sql.Column = fromunixtime(ts,-MM-dd)

scala> val finaldf = df.withColumn("ts1",ts1)
finaldf: org.apache.spark.sql.DataFrame = [client_id: string, ts: string,
ts1: string]

scala> finaldf.show
++-+---+
|   client_id|   ts|ts1|
++-+---+
|cd646551-fceb-416...|1477989416803|48805-08-14|
|3bc61951-0f49-43b...|1477983725292|48805-06-09|
|688acc61-753f-4a3...|1479899459947|48866-02-22|
|5ff1eb6c-14ec-471...|1479901374026|48866-03-16|
++-+---+

The year is returning wrong here.May be the input timestamp is not correct
.Not sure.

Thanks
Deepak

On Mon, Dec 5, 2016 at 1:34 PM, Devi P.V  wrote:

> Hi,
>
> Thanks for replying to my question.
> I am using scala
>
> On Mon, Dec 5, 2016 at 1:20 PM, Marco Mistroni 
> wrote:
>
>> Hi
>>  In python you can use date time.fromtimestamp(..).str
>> ftime('%Y%m%d')
>> Which spark API are you using?
>> Kr
>>
>> On 5 Dec 2016 7:38 am, "Devi P.V"  wrote:
>>
>>> Hi all,
>>>
>>> I have a dataframe like following,
>>>
>>> ++---+
>>> |client_id   |timestamp|
>>> ++---+
>>> |cd646551-fceb-4166-acbc-b9|1477989416803  |
>>> |3bc61951-0f49-43bf-9848-b2|1477983725292  |
>>> |688acc61-753f-4a33-a034-bc|1479899459947  |
>>> |5ff1eb6c-14ec-4716-9798-00|1479901374026  |
>>> ++---+
>>>
>>>  I want to convert timestamp column into -MM-dd format.
>>> How to do this?
>>>
>>>
>>> Thanks
>>>
>>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: How to convert a unix timestamp column into date format(yyyy-MM-dd) ?

2016-12-05 Thread Devi P.V
Hi,

Thanks for replying to my question.
I am using scala

On Mon, Dec 5, 2016 at 1:20 PM, Marco Mistroni  wrote:

> Hi
>  In python you can use date time.fromtimestamp(..).
> strftime('%Y%m%d')
> Which spark API are you using?
> Kr
>
> On 5 Dec 2016 7:38 am, "Devi P.V"  wrote:
>
>> Hi all,
>>
>> I have a dataframe like following,
>>
>> ++---+
>> |client_id   |timestamp|
>> ++---+
>> |cd646551-fceb-4166-acbc-b9|1477989416803  |
>> |3bc61951-0f49-43bf-9848-b2|1477983725292  |
>> |688acc61-753f-4a33-a034-bc|1479899459947  |
>> |5ff1eb6c-14ec-4716-9798-00|1479901374026  |
>> ++---+
>>
>>  I want to convert timestamp column into -MM-dd format.
>> How to do this?
>>
>>
>> Thanks
>>
>