Re: REST Structured Steaming Sink

2020-07-01 Thread Burak Yavuz
Well, the difference is, a technical user writes the UDF and a
non-technical user may use this built-in thing (misconfigure it) and shoot
themselves in the foot.

On Wed, Jul 1, 2020, 6:40 PM Andrew Melo  wrote:

> On Wed, Jul 1, 2020 at 8:13 PM Burak Yavuz  wrote:
> >
> > I'm not sure having a built-in sink that allows you to DDOS servers is
> the best idea either. foreachWriter is typically used for such use cases,
> not foreachBatch. It's also pretty hard to guarantee exactly-once, rate
> limiting, etc.
>
> If you control the machines and can run arbitrary code, you can DDOS
> whatever you want. What's the difference between this proposal and
> writing a UDF that opens 1,000 connections to a target machine?
>
> > Best,
> > Burak
> >
> > On Wed, Jul 1, 2020 at 5:54 PM Holden Karau 
> wrote:
> >>
> >> I think adding something like this (if it doesn't already exist) could
> help make structured streaming easier to use, foreachBatch is not the best
> API.
> >>
> >> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
> >>>
> >>> I guess the method, query parameter, header, and the payload would be
> all different for almost every use case - that makes it hard to generalize
> and requires implementation to be pretty much complicated to be flexible
> enough.
> >>>
> >>> I'm not aware of any custom sink implementing REST so your best bet
> would be simply implementing your own with foreachBatch, but so someone
> might jump in and provide a pointer if there is something in the Spark
> ecosystem.
> >>>
> >>> Thanks,
> >>> Jungtaek Lim (HeartSaVioR)
> >>>
> >>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin 
> wrote:
> >>>>
> >>>> Hi All,
> >>>>
> >>>>
> >>>> We ingest alot of restful APIs into our lake and I'm wondering if it
> is at all possible to created a rest sink in structured streaming?
> >>>>
> >>>> For now I'm only focusing on restful services that have an
> incremental ID so my sink can just poll for new data then ingest.
> >>>>
> >>>> I can't seem to find a connector that does this and my gut instinct
> tells me it's probably because it isn't possible due to something
> completely obvious that I am missing
> >>>>
> >>>> I know some RESTful API obfuscate the IDs to a hash of strings and
> that could be a problem but since I'm planning on focusing on just
> numerical IDs that just get incremented I think I won't be facing that issue
> >>>>
> >>>>
> >>>> Can anyone let me know if this sounds like a daft idea? Will I need
> something like Kafka or kinesis as a buffer and redundancy or am I
> overthinking this?
> >>>>
> >>>>
> >>>> I would love to bounce ideas with people who runs structured
> streaming jobs in production
> >>>>
> >>>>
> >>>> Kind regards
> >>>> San
> >>>>
> >>>>
> >>
> >>
> >> --
> >> Twitter: https://twitter.com/holdenkarau
> >> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9
> >> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: REST Structured Steaming Sink

2020-07-01 Thread Burak Yavuz
I'm not sure having a built-in sink that allows you to DDOS servers is the
best idea either. foreachWriter is typically used for such use cases, not
foreachBatch. It's also pretty hard to guarantee exactly-once, rate
limiting, etc.

Best,
Burak

On Wed, Jul 1, 2020 at 5:54 PM Holden Karau  wrote:

> I think adding something like this (if it doesn't already exist) could
> help make structured streaming easier to use, foreachBatch is not the best
> API.
>
> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim 
> wrote:
>
>> I guess the method, query parameter, header, and the payload would be all
>> different for almost every use case - that makes it hard to generalize and
>> requires implementation to be pretty much complicated to be flexible enough.
>>
>> I'm not aware of any custom sink implementing REST so your best bet would
>> be simply implementing your own with foreachBatch, but so someone might
>> jump in and provide a pointer if there is something in the Spark ecosystem.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin 
>> wrote:
>>
>>> Hi All,
>>>
>>>
>>> We ingest alot of restful APIs into our lake and I'm wondering if it is
>>> at all possible to created a rest sink in structured streaming?
>>>
>>> For now I'm only focusing on restful services that have an incremental
>>> ID so my sink can just poll for new data then ingest.
>>>
>>> I can't seem to find a connector that does this and my gut instinct
>>> tells me it's probably because it isn't possible due to something
>>> completely obvious that I am missing
>>>
>>> I know some RESTful API obfuscate the IDs to a hash of strings and that
>>> could be a problem but since I'm planning on focusing on just numerical IDs
>>> that just get incremented I think I won't be facing that issue
>>>
>>>
>>> Can anyone let me know if this sounds like a daft idea? Will I need
>>> something like Kafka or kinesis as a buffer and redundancy or am I
>>> overthinking this?
>>>
>>>
>>> I would love to bounce ideas with people who runs structured streaming
>>> jobs in production
>>>
>>>
>>> Kind regards
>>> San
>>>
>>>
>>>
>
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Burak Yavuz
Hi Rishi,

That is exactly why Trigger.Once was created for Structured Streaming. The
way we look at streaming is that it doesn't have to be always real time, or
24-7 always on. We see streaming as a workflow that you have to repeat
indefinitely. See this blog post for more details!
https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

Best,
Burak

On Fri, May 1, 2020 at 2:55 PM Rishi Shah  wrote:

> Hi All,
>
> I recently started playing with spark streaming, and checkpoint location
> feature looks very promising. I wonder if anyone has an opinion about using
> spark streaming with checkpoint location option as a slow batch processing
> solution. What would be the pros and cons of utilizing streaming with
> checkpoint location feature to achieve fault tolerance in batch processing
> application?
>
> --
> Regards,
>
> Rishi Shah
>


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Burak Yavuz
Just set `failOnDataLoss=false` as an option in readStream?

On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:

> Hi all,
>
> I have a spark structured streaming app that is consuming from a kafka
> topic with retention set up. Sometimes I face an issue where my query has
> not finished processing a message but the retention kicks in and deletes
> the offset, which since I use the default setting of “failOnDataLoss=true”
> causes my query to fail. The solution I currently have is manual, deleting
> the offsets directory and rerunning.
>
> I instead like to have spark automatically fall back to the earliest
> offset available. The solutions I saw recommend setting auto.offset =
> earliest, but for structured streaming, you cannot set that. How do I do
> this for structured streaming?
>
> Thanks!
> --
> Cheers,
> Ruijing Li
>


Re: ForEachBatch collecting batch to driver

2020-03-11 Thread Burak Yavuz
foreachBatch gives you the micro-batch as a DataFrame, which is
distributed. If you don't call collect on that DataFrame, it shouldn't have
any memory implications on the Driver.

On Tue, Mar 10, 2020 at 3:46 PM Ruijing Li  wrote:

> Hi all,
>
> I’m curious on how foreachbatch works in spark structured streaming. So
> since it is taking in a micro batch dataframe, that means the code in
> foreachbatch is executing on spark driver? Does this mean for large
> batches, you could potentially have OOM issues from collecting each
> partition into the driver?
> --
> Cheers,
> Ruijing Li
>


Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Burak Yavuz
Do you really want to build all of that and open yourself to bugs when you
can just use foreachBatch? Here are your options:

1. Build it yourself

// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()

df = spark.read.format("kafka").option("startOffsets",
prevOffsets).option("endOffsets", latestOffsets).load()
batchLogic(df)

saveOffsets(latestOffsets)

2. Structured Streaming + Trigger.Once + foreachBatch

spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
batchId) => batchLogic(df)).trigger("once").start()

With Option (1), you're going to have to (re)solve:
 a) Tracking and consistency of offsets
 b) Potential topic partition mismatches
 c) Offsets that may have aged out due to retention
 d) Re-execution of jobs and data consistency. What if your job fails as
you're committing the offsets in the end, but the data was already stored?
Will your getOffsets method return the same offsets?

I'd rather not solve problems that other people have solved for me, but
ultimately the decision is yours to make.

Best,
Burak




On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li  wrote:

> Thanks Anil, I think that’s the approach I will take.
>
> Hi Burak,
>
> That was a possibility to think about, but my team has custom dataframe
> writer functions we would like to use, unfortunately they were written for
> static dataframes in mind. I do see there is a ForEachBatch write mode but
> my thinking was at that point it was easier to read from kafka through
> batch mode.
>
> Thanks,
> RJ
>
> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:
>
>> Hi Ruijing,
>>
>> Why do you not want to use structured streaming here? This is exactly why
>> structured streaming + Trigger.Once was built, just so that you don't build
>> that solution yourself.
>> You also get exactly once semantics if you use the built in sinks.
>>
>> Best,
>> Burak
>>
>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:
>>
>>> Hi Ruijing,
>>>
>>> We did the below things to read Kafka in batch from spark:
>>>
>>> 1) Maintain the start offset (could be db, file etc)
>>> 2) Get the end offset dynamically when the job executes.
>>> 3) Pass the start and end offsets
>>> 4) Overwrite the start offset with the end offset. (Should be done post
>>> processing the data)
>>>
>>> Currently to make it work in batch mode, you need to maintain the state
>>> information of the offsets externally.
>>>
>>>
>>> Thanks
>>> Anil
>>>
>>> -Sent from my mobile
>>> http://anilkulkarni.com/
>>>
>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li  wrote:
>>>
>>>> Hi all,
>>>>
>>>> My use case is to read from single kafka topic using a batch spark sql
>>>> job (not structured streaming ideally). I want this batch job every time it
>>>> starts to get the last offset it stopped at, and start reading from there
>>>> until it caught up to the latest offset, store the result and stop the job.
>>>> Given the dataframe has a partition and offset column, my first thought for
>>>> offset management is to groupBy partition and agg the max offset, then
>>>> store it in HDFS. Next time the job runs, it will read and start from this
>>>> max offset using startingOffsets
>>>>
>>>> However, I was wondering if this will work. If the kafka producer
>>>> failed an offset and later decides to resend it, I will have skipped it
>>>> since I’m starting from the max offset sent. How does spark structured
>>>> streaming know to continue onwards - does it keep a state of all offsets
>>>> seen? If so, how can I replicate this for batch without missing data? Any
>>>> help would be appreciated.
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
> Cheers,
> Ruijing Li
>


Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Burak Yavuz
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why
structured streaming + Trigger.Once was built, just so that you don't build
that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:

> Hi Ruijing,
>
> We did the below things to read Kafka in batch from spark:
>
> 1) Maintain the start offset (could be db, file etc)
> 2) Get the end offset dynamically when the job executes.
> 3) Pass the start and end offsets
> 4) Overwrite the start offset with the end offset. (Should be done post
> processing the data)
>
> Currently to make it work in batch mode, you need to maintain the state
> information of the offsets externally.
>
>
> Thanks
> Anil
>
> -Sent from my mobile
> http://anilkulkarni.com/
>
> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li  wrote:
>
>> Hi all,
>>
>> My use case is to read from single kafka topic using a batch spark sql
>> job (not structured streaming ideally). I want this batch job every time it
>> starts to get the last offset it stopped at, and start reading from there
>> until it caught up to the latest offset, store the result and stop the job.
>> Given the dataframe has a partition and offset column, my first thought for
>> offset management is to groupBy partition and agg the max offset, then
>> store it in HDFS. Next time the job runs, it will read and start from this
>> max offset using startingOffsets
>>
>> However, I was wondering if this will work. If the kafka producer failed
>> an offset and later decides to resend it, I will have skipped it since I’m
>> starting from the max offset sent. How does spark structured streaming know
>> to continue onwards - does it keep a state of all offsets seen? If so, how
>> can I replicate this for batch without missing data? Any help would be
>> appreciated.
>>
>>
>> --
>> Cheers,
>> Ruijing Li
>>
>


Re: Structured Streaming & Enrichment Broadcasts

2019-11-18 Thread Burak Yavuz
If you store the data that you're going to broadcast as a Delta table (see
delta.io) and perform a stream-batch (where your Delta table is the batch)
join, it will auto-update once the table receives any updates.

Best,
Burak

On Mon, Nov 18, 2019, 6:21 AM Bryan Jeffrey  wrote:

> Hello.
>
> We're running applications using Spark Streaming.  We're going to begin
> work to move to using Structured Streaming.  One of our key scenarios is to
> lookup values from an external data source for each record in an incoming
> stream.  In Spark Streaming we currently read the external data, broadcast
> it and then lookup the value from the broadcast.  The broadcast value is
> refreshed on a periodic basis - with the need to refresh evaluated on each
> batch (in a foreachRDD).  The broadcasts are somewhat large (~1M records).
> Each stream we're doing the lookup(s) for is ~6M records / second.
>
> While we could conceivably continue this pattern in Structured Streaming
> with Spark 2.4.x and the 'foreachBatch', based on my read of documentation
> this seems like a bit of an anti-pattern in Structured Streaming.
>
> So I am looking for advice: What mechanism would you suggest to on a
> periodic basis read an external data source and do a fast lookup for a
> streaming input.  One option appears to be to do a broadcast left outer
> join?  In the past this mechanism has been less easy to performance tune
> than doing an explicit broadcast and lookup.
>
> Regards,
>
> Bryan Jeffrey
>


Re: Delta with intelligent upsett

2019-11-02 Thread Burak Yavuz
You can just add the target partitioning filter to your MERGE or UPDATE
condition, e.g.

MERGE INTO target
USING source
ON target.key = source.key AND target.year = year(current_date())
...

Best,
Burak

On Thu, Oct 31, 2019, 10:15 PM ayan guha  wrote:

>
> Hi
>
> we have a scenario where we have a large table  ie 5-6B records. The table
> is repository of data from past N years. It is possible that some updates
> take place on the data and thus er are using Delta table.
>
> As part of the business process we know updates can happen only within M
> years of past records where M is much smaller than N. Eg the table can hold
> 20 yrs of data but we know updates can happen only for last year not before
> that.
>
> Is there some way to indicate this additional intelligence to Delta so it
> can look into only last years data while running a merge or update? It
> seems to be an obvious performance booster.
>
> Any thoughts?
> --
> Best Regards,
> Ayan Guha
> --
> Best Regards,
> Ayan Guha
>


Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Burak Yavuz
Hey Charles,
If you are using maxOffsetsPerTrigger, you will likely rest the offsets
every microbatch, because:
 1. Spark will figure out a range of offsets to process (let's call them x
and y)
 2. If these offsets have fallen out of the retention period, Spark will
try to set the offset to x which is less than z > y > x.
 3. Since z > y, Spark will not process any of the data
 4. Goto 1

On Wed, Sep 11, 2019, 6:09 PM Charles vinodh  wrote:

> Hi Sandish,
>
> as I have said if the offset reset happens only once that would make
> sense. But I am not sure how to explain why the offset reset is happening
> for every micro-batch...
> ideally once the offset reset happens the app should move to a valid
> offset and start consuming data. but in my case for every batch the offset
> is getting reset and no data is ever getting generated.
>
> Thanks,
> Charles
>
> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
> wrote:
>
>> You can see this kind of error, if there is consumer lag more than Kafka
>> retention period.
>> You will not see any failures if below option is not set.
>>
>> Set failOnDataLoss=true option to see failures.
>>
>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
>> wrote:
>>
>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
>>> *fetch.message.max.bytes. *
>>>
>>> *"*may be that you are trying to process records that have passed the
>>> retention period within Kafka.*"*
>>> If the above is true then I should have my offsets reset only once
>>> ideally when my application starts. But mu offsets are resetting for every
>>> batch. if my application is using offsets that are no longer available in
>>> Kafka it will reset to earliest or latest offset available in Kafka and the
>>> next request made to Kafka should provide proper data. But in case for all
>>> micro-batches the offsets are getting reseted and the batch is producing no
>>> data.
>>>
>>>
>>>
>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>>
>>>> Do you have rate limiting set on your stream? It may be that you are
>>>> trying to process records that have passed the retention period within
>>>> Kafka.
>>>>
>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>>>> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to run a spark application ingesting data from Kafka using
>>>>> the Spark structured streaming and the spark library
>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>>>>> issue where during execution of all my micro-batches the Kafka consumer is
>>>>> not able to fetch the offsets and its having its offsets reset as show
>>>>> below in this log
>>>>>
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>>>
>>>>>
>>>>> It is reasonable if this resetting happens once in application due to
>>>>> the fact that the offsets stored in my checkpoint are no longer valid and
>>>>> will have to reset our offsets to a new value. But I am seeing this reset
>>>>> happening for every micro batch execution in my streaming job. In at the
>>>>> end the streaming query progress emits the following
>>>>>
>>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
>>>>> progress: {
>>>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
&g

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Burak Yavuz
Do you have rate limiting set on your stream? It may be that you are trying
to process records that have passed the retention period within Kafka.

On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
wrote:

>
> Hi,
>
> I am trying to run a spark application ingesting data from Kafka using the
> Spark structured streaming and the spark library
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
> issue where during execution of all my micro-batches the Kafka consumer is
> not able to fetch the offsets and its having its offsets reset as show
> below in this log
>
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-5 to offset 1168959116.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-1 to offset 1218619371.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-8 to offset 1157205346.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-21 to offset 1255403059.
>
>
> It is reasonable if this resetting happens once in application due to the
> fact that the offsets stored in my checkpoint are no longer valid and will
> have to reset our offsets to a new value. But I am seeing this reset
> happening for every micro batch execution in my streaming job. In at the
> end the streaming query progress emits the following
>
> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>   "name" : null,
>   "timestamp" : "2019-09-10T15:55:00.000Z",
>   "batchId" : 189,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "addBatch" : 127,
> "getBatch" : 0,
> "getEndOffset" : 0,
> "queryPlanning" : 24,
> "setOffsetRange" : 36,
> "triggerExecution" : 1859,
> "walCommit" : 1032
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
> "startOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206926686,
> "8" : 1158514946,
> "17" : 1258387219,
> "11" : 1263091642,
> "2" : 1226741128,
> "20" : 1229560889,
> "5" : 1170304913,
> "14" : 1207333901,
> "4" : 1274242728,
> "13" : 1336386658,
> "22" : 1260210993,
> "7" : 1288639296,
> "16" : 1247462229,
> "10" : 1093157103,
> "1" : 1219904858,
> "19" : 1116269615,
> "9" : 1238935018,
> "18" : 1069224544,
> "12" : 1256018541,
> "3" : 1251150202,
> "21" : 1256774117,
> "15" : 1170591375,
> "6" : 1185108169,
> "24" : 1202342095,
> "0" : 1165356330
>   }
> },
> "endOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206928043,
> "8" : 1158516721,
> "17" : 1258389219,
> "11" : 1263093490,
> "2" : 1226743225,
> "20" : 1229562962,
> "5" : 1170307882,
> "14" : 1207335736,
> "4" : 1274245585,
> "13" : 1336388570,
> "22" : 1260213582,
> "7" : 1288641384,
> "16" : 1247464311,
> "10" : 1093159186,
> "1" : 1219906407,
> "19" : 1116271435,
> "9" : 1238936994,
> "18" : 1069226913,
> "12" : 1256020926,
> "3" : 1251152579,
> "21" : 1256776910,
> "15" : 1170593216,
> "6" : 1185110032,
> "24" : 1202344538,
> "0" : 1165358262
>   }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
> "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
>   }
> }
>
>
> In the above StreamingQueryProgress event the numInputRows fields  is zero
> and this is the case for all micro batch executions and no data is being
> produced whatsoever. So basically for each batch my offsets are being reset
> and each batch is producing zero rows. Since there is no work being done
> and since dynamic allocation is enabled all my executors killed... I have
> tried deleting my checkpoint and started my application from scratch and I
> am still facing the same issue. What could possibly be wrong this?... what
> lines of investigation should I take?  If you are interested in getting
> Stackoverflow point you can answer my question in SO here
> 

Re: Static partitioning in partitionBy()

2019-05-07 Thread Burak Yavuz
It depends on the data source. Delta Lake (https://delta.io) allows you to
do it with the .option("replaceWhere", "c = c1"). With other file formats,
you can write directly into the partition directory (tablePath/c=c1), but
you lose atomicity.

On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
wrote:

> Hi All,
>
> Is there a way I can provide static partitions in partitionBy()?
>
> Like:
> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>
> Above code gives following error as it tries to find column `c=c1` in df.
>
> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
> in schema struct;
>
> Thanks,
> Shubham
>


Re: Spark Structured Streaming using S3 as data source

2018-08-26 Thread Burak Yavuz
Yes, the checkpoint makes sure that you start off from where you left off.

On Sun, Aug 26, 2018 at 2:22 AM sherif98 
wrote:

> I have data that is continuously pushed to multiple S3 buckets. I want to
> set
> up a structured streaming application that uses the S3 buckets as the data
> source and do stream-stream joins.
>
> My question is if the application is down for some reason, will restarting
> the application would continue processing data from the S3 where it left
> off?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka backlog - spark structured streaming

2018-07-30 Thread Burak Yavuz
If you don't set rate limiting through `maxOffsetsPerTrigger`, Structured
Streaming will always process until the end of the stream. So number of
records waiting to be processed should be 0 at the start of each trigger.

On Mon, Jul 30, 2018 at 8:03 AM, Kailash Kalahasti <
kailash.kalaha...@gmail.com> wrote:

> Is there any way to find out backlog on kafka topic while using spark
> structured streaming ? I checked few consumer apis but that requires to
> enable groupid for streaming, but seems it is not allowed.
>
> Basically i want to know number of records waiting to be processed.
>
> Any suggestions ?
>


Re: Structured Streaming: distinct (Spark 2.2)

2018-03-19 Thread Burak Yavuz
I believe the docs are out of date regarding distinct. The behavior should
be as follows:

 - Distinct should be applied across triggers
 - In order to prevent the state from growing indefinitely, you need to add
a watermark
 - If you don't have a watermark, but your key space is small, that's also
fine
 - If a record arrives and is not in the state, it will be outputted
 - If a record arrives and is in the state, it will be ignored
 - Once the watermark passes for a key, it will be dropped from state
 - If a record arrives late, i.e. after the watermark, it will be ignored

HTH!
Burak


On Mon, Mar 19, 2018 at 12:04 PM, Geoff Von Allmen 
wrote:

> I see in the documentation that the distinct operation is not supported
> 
> in Structured Streaming. That being said, I have noticed that you are able
> to successfully call distinct() on a data frame and it seems to perform
> the desired operation and doesn’t fail with the AnalysisException as
> expected. If I call it with a column name specified, then it will fail with
> AnalysisException.
>
> I am using Structured Streaming to read from a Kafka stream and my
> question (and concern) is that:
>
>- The distinct operation is properly applied across the *current*
>batch as read from Kafka, however, the distinct operation would not
>apply across batches.
>
> I have tried the following:
>
>- Started the streaming job to see my baseline data and left the job
>streaming
>- Created events in kafka that would increment my counts if distinct
>was not performing as expected
>- Results:
>   - Distinct still seems to be working over the entire data set even
>   as I add new data.
>   - As I add new data, I see spark process the data (I’m doing output
>   mode = update) but there are no new results indicating the distinct
>   function is in fact still working across batches as spark pulls in the 
> new
>   data from kafka.
>
> Does anyone know more about the intended behavior of distinct in
> Structured Streaming?
>
> If this is working as intended, does this mean I could have a dataset that
> is growing without bound being held in memory/disk or something to that
> effect (so it has some way to make that distinct operation against previous
> data)?
> ​
>


Re: Infer JSON schema in structured streaming Kafka.

2017-12-11 Thread Burak Yavuz
In Spark 2.2, you can read from Kafka in batch mode, and then use the json
reader to infer schema:

val df = spark.read.format("kafka")...
  .select($"value.cast("string"))
val json = spark.read.json(df)
val schema = json.schema

While the above should be slow (since you're reading almost all data in
Kafka in batch), but it would work.

My question to you is, do you think it's worth it? Why do you have a random
json schema being inputted to your Kafka stream? Can this randomness not
mess up everything in the future if someone messes up? Not having fixed,
known schemas with streaming data (or any data for that matter) is
dangerous for most purposes.
Just food for thought.

Best,
Burak



On Mon, Dec 11, 2017 at 4:01 AM, Jacek Laskowski  wrote:

> Hi,
>
> What about a custom streaming Sink that would stop the query after
> addBatch has been called?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>> Hi Jacek,
>>
>> For now , i am using Thread.sleep() on driver, to make sure my streaming
>> query receives some data and and stop it, before the control reaches
>> querying memory table.
>> Let me know if there is any better way of handling it.
>>
>> Regards,
>> Satyajit.
>>
>> On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
>> satyajit.apas...@gmail.com> wrote:
>>
>>> Hi Jacek,
>>>
>>> Thank you for responding back,
>>>
>>> i have tried memory sink, and below is what i did
>>>
>>>  val fetchValue = 
>>> debeziumRecords.selectExpr("value").withColumn("tableName",
>>> functions.get_json_object($"value".cast(StringType), "$.schema.name"))
>>> .withColumn("operation", 
>>> functions.get_json_object($"value".cast(StringType),
>>> "$.payload.op"))
>>> .withColumn("payloadAfterValue", 
>>> split(substring_index(debeziumRecords("value"),
>>> "\"after\":" ,-1),",\"source\"").getItem(0))
>>> 
>>> .drop("tableName").drop("operation").drop("value").as[String].writeStream
>>>
>>> .outputMode(OutputMode.Append())
>>> .queryName("record")
>>> .format("memory")
>>> .start()
>>>
>>> spark.sql("select * from record").show(truncate = false) //i was
>>> expecting to be able to use the record table to read the JSON string, but
>>> the table is empty for the first call. And i do not see any dataframe
>>> output after the first one
>>>
>>> *But yeah the above steps work good and i can do things that i need to,
>>> in spark-shell, the problem is when i try to code in Intellij, because the
>>> streaming query keeps running and i am not sure how to identify and stop
>>> the streaming query and use record memory table.*
>>>
>>> So i would like to stop the streaming query once i know i have some data
>>> in my record memory table(is there a way to do that), so i can stop the
>>> streaming query and use the memory table, fetch my record.
>>> Any help on how to approach the situation programmatically/any examples
>>> pointed would highly be appreciated.
>>>
>>> Regards,
>>> Satyajit.
>>>
>>>
>>>
>>> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski 
>>> wrote:
>>>
 Hi,

 What about memory sink? That could work.

 Pozdrawiam,
 Jacek Laskowski
 
 https://about.me/JacekLaskowski
 Spark Structured Streaming https://bit.ly/spark-structured-streaming
 Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
 Follow me at https://twitter.com/jaceklaskowski

 On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
 satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> I would like to infer JSON schema from a sample of data that i receive
> from, Kafka Streams(specific topic), and i have to infer the schema as i 
> am
> going to receive random JSON string with different schema for each topic,
> so i chose to go ahead with below steps,
>
> a. readStream from Kafka(latest offset), from a single Kafka topic.
> b. Some how to store the JSON string into val and infer the schema.
> c. stop the stream.
> d.Create new readStream(smallest offset) and use the above inferred
> schema to process the JSON using spark provided JSON support, like
> from_json, json_object and others and run my actuall business logic.
>
> Now i am not sure how to be successful with step(b). Any help would be
> appreciated.
> And would also like to know if there is any better approach.
>
> Regards,
> Satyajit.
>


>>>
>>
>


Re: Reload some static data during struct streaming

2017-11-13 Thread Burak Yavuz
I think if you don't cache the jdbc table, then it should auto-refresh.

On Mon, Nov 13, 2017 at 1:21 PM, spark receiver 
wrote:

> Hi
>
> I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works
> great. The thing is I need to join the Kafka message with a relative static
> table stored in mysql database (let’s call it metadata here).
>
> So is it possible to reload the metadata table after some time
> interval(like daily ) without restart running struct streaming?
>
> Snippet code as following :
>
> // df_meta contains important information to join with the dataframe read 
> from kafka
>
> val df_meta = spark.read.format("jdbc").option("url", mysql_url).option(
> "dbtable", "v_entity_ap_rel").load()
>
> df_meta.cache()
>
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> *“*x.x.x.x:9092").option("fetch.message.max.bytes", 
> "5000").option("kafka.max.partition.fetch.bytes", "5000")
>   .option("subscribe", "rawdb.raw_data")
>   .option("failOnDataLoss", true)
>   .option("startingOffsets", "latest")
>   .load()
>   .select($"value".as[Array[Byte]])
>   .map(avroDeserialize(_))
>   .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
>   .join(df_meta.as("b"), $"a.apmac" === $"b.apmac*”*)
>
>
> df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi 
> then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", 
> "substring(stime,1,13) STIME_HOUR")
>   .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
>   .option("checkpointLocation", 
> "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
>   .start("T_CF_TABLE")
>   .awaitTermination()
>
>
> Mason
>


Re: Getting Message From Structured Streaming Format Kafka

2017-11-02 Thread Burak Yavuz
Hi Daniel,

Several things:
 1) Your error seems to suggest you're using a different version of Spark
and a different version of the sql-kafka connector. Could you make sure
they are on the same Spark version?
 2) With Structured Streaming, you may remove everything related to a
StreamingContext.

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1))

These lines are not doing anything for Structured Streaming.


Best,
Burak

On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani <
daniel.oliveira.mantov...@gmail.com> wrote:

> Hello, I'm trying to run the following code,
>
> var newContextCreated = false // Flag to detect whether new context was 
> created or not
> val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
>
> private val batchDuration: Duration = Seconds(3)
> private val master: String = "local[2]"
> private val appName: String = this.getClass().getSimpleName()
> private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"
>
> // Create a Spark configuration
>
> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>
> val ssc = new StreamingContext(sparkConf, batchDuration)
> ssc.checkpoint(checkpointDir)
> ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we 
> query it interactively
>
> val spark = SparkSession
>   .builder
>   .config(sparkConf)
>   .getOrCreate()
>
> val lines = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "evil_queue")
>   .load()
>
> lines.printSchema()
>
> import spark.implicits._
> val noAggDF = lines.select("key")
>
> noAggDF
>   .writeStream
>   .format("console")
>   .start()
>
>
> But I'm having the error:
>
> http://paste.scsys.co.uk/565658
>
>
> How do I get my messages using kafka as format from Structured Streaming ?
>
>
> Thank you
>
>
> --
>
> --
> Daniel de Oliveira Mantovani
> Perl Evangelist/Data Hacker
> +1 786 459 1341 <(786)%20459-1341>
>


Re: Spark Structured Streaming not connecting to Kafka using kerberos

2017-10-16 Thread Burak Yavuz
Hi Darshan,

How are you creating your kafka stream? Can you please share the options
you provide?

spark.readStream.format("kafka")
  .option(...) // all these please
  .load()


On Sat, Oct 14, 2017 at 1:55 AM, Darshan Pandya 
wrote:

> Hello,
>
> I'm using Spark 2.1.0 on CDH 5.8 with kafka 0.10.0.1 + kerberos
>
> I am unable to connect to the kafka broker with the following message
>
>
> 17/10/14 14:29:10 WARN clients.NetworkClient: Bootstrap broker
> 10.197.19.25:9092 disconnected
>
> and is unable to consume any messages.
>
> And am using it as follows
>
> jaas.conf
>
> KafkaClient {
> com.sun.security.auth.module.Krb5LoginModule required
> useKeyTab=true
> keyTab="./gandalf.keytab"
> storeKey=true
> useTicketCache=false
> serviceName="kafka"
> principal="gand...@domain.com";
> };
>
> $SPARK_HOME/bin/spark-submit \
> --master yarn \
> --files jaas.conf,gandalf.keytab \
> --driver-java-options "-Djava.security.auth.login.config=./jaas.conf 
> -Dhdp.version=2.4.2.0-258" \
> --conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf"
>  \
> --class com.example.ClassName uber-jar-with-deps-and-hive-site.jar
>
> Thanks in advance.
>
>
> --
> Sincerely,
> Darshan
>
>


Re: Structured streaming coding question

2017-09-20 Thread Burak Yavuz
Please remove

query1.awaitTermination();
query2.awaitTermination();

once

query1.awaitTermination();

is called, you don't even get to query2.awaitTermination().


On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Burak,
>
> Thanks much! had no clue that existed. Now, I changed it to this.
>
> StreamingQuery query1 = 
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
> KafkaSink("hello1")).start();
> StreamingQuery query2 = 
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
> KafkaSink("hello2")).start();
>
> query1.awaitTermination();
> query2.awaitTermination();
> sparkSession.streams().awaitAnyTermination();
>
>
>
>
>
> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hey Kant,
>>
>> That won't work either. Your second query may fail, and as long as your
>> first query is running, you will not know. Put this as the last line
>> instead:
>>
>> spark.streams.awaitAnyTermination()
>>
>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Looks like my problem was the order of awaitTermination() for some
>>> reason.
>>>
>>> *Doesn't work *
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>> *Works*
>>>
>>> StreamingQuery query1 = outputDS1.writeStream().trigge
>>> r(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start();
>>>
>>> query1.awaitTermination()
>>>
>>> StreamingQuery query2 =outputDS2.writeStream().trigg
>>> er(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start();
>>>
>>> query2.awaitTermination()
>>>
>>>
>>>
>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> Looks like my problem was the order of awaitTermination() for some
>>>> reason.
>>>>
>>>> Doesn't work
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have the following Psuedo code (I could paste the real code however
>>>>> it is pretty long and involves Database calls inside dataset.map operation
>>>>> and so on) so I am just trying to simplify my question. would like to know
>>>>> if there is something wrong with the following pseudo code?
>>>>>
>>>>> DataSet inputDS = readFromKaka(topicName)
>>>>>
>>>>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>>> Since I can see data getting populated
>>>>>
>>>>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as
>>>>> well
>>>>>
>>>>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
>>>>> work
>>>>>
>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>
>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>
>>>>>
>>>>> *So what's happening with above code is that I can see data coming out
>>>>> of hello1 topic but not from hello2 topic.* I thought there is
>>>>> something wrong with "outputDS2" so I switched the order  so now the code
>>>>> looks like this
>>>>>
>>>>> DataSet inputDS = readFromKaka(topicName)
>>>>>
>>>>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>>> Since I can see data getting populated
>>>>>
>>>>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This
>>>>> Works
>>>>>
>>>>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't
>>>>> work
>>>>>
>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>
>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>
>>>>> *Now I can see data coming out from hello2 kafka topic but not from
>>>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>>>> outputDS2 but not both. * At this point I am not sure what is going
>>>>> on?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Structured streaming coding question

2017-09-20 Thread Burak Yavuz
Hey Kant,

That won't work either. Your second query may fail, and as long as your
first query is running, you will not know. Put this as the last line
instead:

spark.streams.awaitAnyTermination()

On Tue, Sep 19, 2017 at 10:11 PM, kant kodali  wrote:

> Looks like my problem was the order of awaitTermination() for some reason.
>
> *Doesn't work *
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
> *Works*
>
> StreamingQuery query1 = outputDS1.writeStream().trigge
> r(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start();
>
> query1.awaitTermination()
>
> StreamingQuery query2 =outputDS2.writeStream().trigg
> er(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start();
>
> query2.awaitTermination()
>
>
>
> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali  wrote:
>
>> Looks like my problem was the order of awaitTermination() for some reason.
>>
>> Doesn't work
>>
>>
>>
>>
>>
>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have the following Psuedo code (I could paste the real code however it
>>> is pretty long and involves Database calls inside dataset.map operation and
>>> so on) so I am just trying to simplify my question. would like to know if
>>> there is something wrong with the following pseudo code?
>>>
>>> DataSet inputDS = readFromKaka(topicName)
>>>
>>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>>> Since I can see data getting populated
>>>
>>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as
>>> well
>>>
>>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
>>> work
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>>
>>> *So what's happening with above code is that I can see data coming out
>>> of hello1 topic but not from hello2 topic.* I thought there is
>>> something wrong with "outputDS2" so I switched the order  so now the code
>>> looks like this
>>>
>>> DataSet inputDS = readFromKaka(topicName)
>>>
>>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>>> Since I can see data getting populated
>>>
>>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works
>>>
>>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>> *Now I can see data coming out from hello2 kafka topic but not from
>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>> outputDS2 but not both. * At this point I am not sure what is going on?
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>


Re: [Structured Streaming] Trying to use Spark structured streaming

2017-09-11 Thread Burak Yavuz
Hi Eduardo,

What you have written out is to output counts "as fast as possible" for
windows of 5 minute length and with a sliding window of 1 minute. So for a
record at 10:13, you would get that record included in the count for
10:09-10:14, 10:10-10:15, 10:11-10:16, 10:12-10:16, 10:13-10:18.

Please take a look at the following blog post for more details:
https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html
Also this talk can be helpful:
https://www.youtube.com/watch?v=JAb4FIheP28=942s (especially after 19th
minute)

What you seem to be looking for is "Update" output mode (you may need Spark
2.2 for this IIRC), with a window duration of 5 minutes and no sliding
interval, and a processing time trigger of 1 minute. Note that this still
doesn't guarantee 1 output row every trigger as late data may arrive
(unless you set the watermark accordingly).


Best,
Burak


On Mon, Sep 11, 2017 at 8:04 AM, Eduardo D'Avila <
eduardo.dav...@corp.globo.com> wrote:

> Hi,
>
> I'm trying to use Spark 2.1.1 structured streaming to *count the number
> of records* from Kafka *for each time window* with the code in this
> GitHub gist
> .
>
> I expected that, *once each minute* (the slide duration), it would *output
> a single record* (since the only aggregation key is the window) with the 
> *record
> count for the last 5 minutes* (the window duration). However, it outputs
> several records 2-3 times per minute, like in the sample output included in
> the gist.
>
> Changing the output mode to "append" seems to change the behavior, but
> still far from what I expected.
>
> What is wrong with my assumptions on the way it should work? Given the
> code, how should the sample output be interpreted or used?
>
> Thanks,
>
> Eduardo
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
That just gives you the max time for each train. If I understood the
question correctly, OP wants the whole row with the max time. That's
generally solved through joins or subqueries, which would be hard to do in
a streaming setting

On Aug 29, 2017 7:29 PM, "ayan guha" <guha.a...@gmail.com> wrote:

> Why removing the destination from the window wont work? Like this:
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
> *  .max("time")*
>
> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> @Burak so how would the transformation or query would look like for the
>> above example? I don't see flatMapGroupsWithState in the DataSet API
>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>>
>>
>>
>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> Hey TD,
>>>
>>> If I understood the question correctly, your solution wouldn't return
>>> the exact solution, since it also groups by on destination. I would say the
>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>> .groupByKey(_.train)
>>>
>>> and keep in state the row with the maximum time.
>>>
>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Yes. And in that case, if you just care about only the last few days of
>>>> max, then you should set watermark on the timestamp column.
>>>>
>>>>  *trainTimesDataset*
>>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>>> "train", "dest")*
>>>> *  .max("time")*
>>>>
>>>> Any counts which are more than 5 days old will be dropped from the
>>>> streaming state.
>>>>
>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks for the response. Since this is a streaming based query and in
>>>>> my case I need to hold state for 24 hours which I forgot to mention in my
>>>>> previous email. can I do ?
>>>>>
>>>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>>>>> "24 hours"), "train", "dest").max("time")*
>>>>>
>>>>>
>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>
>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>>>> Int, dest: String, time: Timestamp] *
>>>>>>
>>>>>>
>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>>
>>>>>>
>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>>>>> train, dest"*// after calling
>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I am wondering what is the easiest and concise way to express the
>>>>>>> computation below in Spark Structured streaming given that it supports 
>>>>>>> both
>>>>>>> imperative and declarative styles?
>>>>>>> I am just trying to select rows that has max timestamp for each
>>>>>>> train? Instead of doing some sort of nested queries like we normally do 
>>>>>>> in
>>>>>>> any relational database I am trying to see if I can leverage both
>>>>>>> imperative and declarative at the same time. If nested queries or join 
>>>>>>> are
>>>>>>> not required then I would like to see how this can be possible? I am 
>>>>>>> using
>>>>>>> spark 2.1.1.
>>>>>>>
>>>>>>> Dataset
>>>>>>>
>>>>>>> TrainDest  Time1HK10:001SH
>>>>>>> 12:001SZ14:002HK13:002SH
>>>>>>> 09:002SZ07:00
>>>>>>>
>>>>>>> The desired result should be:
>>>>>>>
>>>>>>> TrainDest  Time1SZ14:002HK13:00
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
Hey TD,

If I understood the question correctly, your solution wouldn't return the
exact solution, since it also groups by on destination. I would say the
easiest solution would be to use flatMapGroupsWithState, where you:
.groupByKey(_.train)

and keep in state the row with the maximum time.

On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das 
wrote:

> Yes. And in that case, if you just care about only the last few days of
> max, then you should set watermark on the timestamp column.
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
> "dest")*
> *  .max("time")*
>
> Any counts which are more than 5 days old will be dropped from the
> streaming state.
>
> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>
>> Hi,
>>
>> Thanks for the response. Since this is a streaming based query and in my
>> case I need to hold state for 24 hours which I forgot to mention in my
>> previous email. can I do ?
>>
>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>> hours"), "train", "dest").max("time")*
>>
>>
>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>> Int, dest: String, time: Timestamp] *
>>>
>>>
>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>
>>>
>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>> train, dest"*// after calling
>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I am wondering what is the easiest and concise way to express the
 computation below in Spark Structured streaming given that it supports both
 imperative and declarative styles?
 I am just trying to select rows that has max timestamp for each train?
 Instead of doing some sort of nested queries like we normally do in any
 relational database I am trying to see if I can leverage both imperative
 and declarative at the same time. If nested queries or join are not
 required then I would like to see how this can be possible? I am using
 spark 2.1.1.

 Dataset

 TrainDest  Time1HK10:001SH12:001   
  SZ14:002HK13:002SH09:002  
   SZ07:00

 The desired result should be:

 TrainDest  Time1SZ14:002HK13:00


>>>
>>
>


Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Burak Yavuz
Hi Jacek,

The way the memory sink is architected at the moment is that it either
appends a row (append/update mode) or replaces all rows (complete mode).
When a user specifies a checkpoint location, the guarantee Structured
Streaming provides is that output sinks will not lose data and will be able
to serve the results according to the specified output mode.

Now, what makes Complete mode so special for the memory sink? With
aggregations, and complete mode, all the results are provided from the
StateStores, therefore we can accept a checkpoint location (where the
StateStores save the data), and we can recreate the memory table at each
trigger.

Why doesn't append mode work for a Memory Sink? The memory sink keeps data
from previous triggers in-memory. It doesn't persist it anywhere. If you
were to query the table after restarting your stream, all the data would've
been lost, and in order to retrieve the existing state of the memory table,
you would need to process all the data that from scratch.

Does all this make sense? Happy to elaborate.

Best,
Burak





On Fri, Aug 18, 2017 at 12:52 PM, Jacek Laskowski  wrote:

> Hi,
>
> This is what I could find in Spark's source code about the
> `recoverFromCheckpointLocation` flag (that led me to explore the
> complete output mode for dropDuplicates operator).
>
> `recoverFromCheckpointLocation` flag is enabled by default and varies
> per sink (memory, console and others).
>
> * `memory` sink has the flag enabled for Complete output mode only
>
> * `foreach` sink has the flag always enabled
>
> * `console` sink has the flag always disabled
>
> * all other sinks have the flag always enabled
>
> As agreed with Michael
> (https://issues.apache.org/jira/browse/SPARK-21667) is to make console
> sink accepting the flag as enabled which would make memory sink the
> only one left with the flag enabled for Complete output.
>
> And I thought I've been close to understand Structured Streaming :)
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Aug 18, 2017 at 9:21 PM, Holden Karau 
> wrote:
> > My assumption is it would be similar though, in memory sink of all of
> your
> > records would quickly overwhelm your cluster, but in aggregation it
> could be
> > reasonable. But there might be additional reasons on top of that.
> >
> > On Fri, Aug 18, 2017 at 11:44 AM Holden Karau 
> wrote:
> >>
> >> Ah yes I'm not sure about the workings of the memory sink.
> >>
> >> On Fri, Aug 18, 2017 at 11:36 AM, Jacek Laskowski 
> wrote:
> >>>
> >>> Hi Holden,
> >>>
> >>> Thanks a lot for a bit more light on the topic. That however does not
> >>> explain why memory sink requires Complete for a checkpoint location to
> >>> work. The only reason I used Complete output mode was to meet the
> >>> requirements of memory sink and that got me thinking why would the
> >>> already-memory-hungry memory sink require yet another thing to get the
> >>> query working.
> >>>
> >>> On to exploring the bits...
> >>>
> >>> Pozdrawiam,
> >>> Jacek Laskowski
> >>> 
> >>> https://medium.com/@jaceklaskowski/
> >>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> >>> Follow me at https://twitter.com/jaceklaskowski
> >>>
> >>>
> >>> On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau 
> >>> wrote:
> >>> > So performing complete output without an aggregation would require
> >>> > building
> >>> > up a table of the entire input to write out at each micro batch. This
> >>> > would
> >>> > get prohibitively expensive quickly. With an aggregation we just need
> >>> > to
> >>> > keep track of the aggregates and update them every batch, so the
> memory
> >>> > requirement is more reasonable.
> >>> >
> >>> > (Note: I don't do a lot of work in streaming so there may be
> additional
> >>> > reasons, but these are the ones I remember from when I was working on
> >>> > looking at integrating ML with SS).
> >>> >
> >>> > On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski 
> >>> > wrote:
> >>> >>
> >>> >> Hi,
> >>> >>
> >>> >> Why is the requirement for a streaming aggregation in a streaming
> >>> >> query? What would happen if Spark allowed Complete without a single
> >>> >> aggregation? This is the latest master.
> >>> >>
> >>> >> scala> val q = ids.
> >>> >>  |   writeStream.
> >>> >>  |   format("memory").
> >>> >>  |   queryName("dups").
> >>> >>  |   outputMode(OutputMode.Complete).  // <-- memory sink
> supports
> >>> >> checkpointing for Complete output mode only
> >>> >>  |   trigger(Trigger.ProcessingTime(30.seconds)).
> >>> >>  |   option("checkpointLocation", "checkpoint-dir"). // <-- use
> >>> >> checkpointing to save state between restarts
> >>> >>  |   start
> >>> >> 

Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-28 Thread Burak Yavuz
Hi Priyank,

You may register them as temporary tables to use across language boundaries.

Python:
df = spark.readStream...
# Python logic
df.createOrReplaceTempView("tmp1")

Scala:
val df = spark.table("tmp1")
df.writeStream
  .foreach(...)


On Fri, Jul 28, 2017 at 3:06 PM, Priyank Shrivastava  wrote:

> TD,
>
> For a hybrid python-scala approach, what's the recommended way of handing
> off a dataframe from python to scala.  I would like to know especially in a
> streaming context.
>
> I am not using notebooks/databricks.  We are running it on our own spark
> 2.1 cluster.
>
> Priyank
>
> On Wed, Jul 26, 2017 at 12:49 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> We see that all the time. For example, in SQL, people can write their
>> user-defined function in Scala/Java and use it from SQL/python/anywhere.
>> That is the recommended way to get the best combo of performance and
>> ease-of-use from non-jvm languages.
>>
>> On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> Thanks TD.  I am going to try the python-scala hybrid approach by using
>>> scala only for custom redis sink and python for the rest of the app .  I
>>> understand it might not be as efficient as purely writing the app in scala
>>> but unfortunately I am constrained on scala resources.  Have you come
>>> across other use cases where people have resided to such python-scala
>>> hybrid approach?
>>>
>>> Regards,
>>> Priyank
>>>
>>>
>>>
>>> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Hello Priyank

 Writing something purely in Scale/Java would be the most efficient.
 Even if we expose python APIs that allow writing custom sinks in pure
 Python, it wont be as efficient as Scala/Java foreach as the data would
 have to go through JVM / PVM boundary which has significant overheads. So
 Scala/Java foreach is always going to be the best option.

 TD

 On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
 priy...@asperasoft.com> wrote:

> I am trying to write key-values to redis using a DataStreamWriter
> object using pyspark structured streaming APIs. I am using Spark 2.2
>
> Since the Foreach Sink is not supported for python; here
> ,
> I am trying to find out some alternatives.
>
> One alternative is to write a separate Scala module only to push data
> into redis using foreach; ForeachWriter
> 
>  is
> supported in Scala. BUT this doesn't seem like an efficient approach and
> adds deployment overhead because now I will have to support Scala in my 
> app.
>
> Another approach is obviously to use Scala instead of python, which is
> fine but I want to make sure that I absolutely cannot use python for this
> problem before I take this path.
>
> Would appreciate some feedback and alternative design approaches for
> this problem.
>
> Thanks.
>
>
>
>

>>>
>>
>


Re: What are some disadvantages of issuing a raw sql query to spark?

2017-07-25 Thread Burak Yavuz
I think Kant meant time windowing functions. You can use

`window(TIMESTAMP, '24 hours', '24 hours')`

On Tue, Jul 25, 2017 at 9:26 AM, Keith Chapman 
wrote:

> Here is an example of a window lead function,
>
> select *, lead(someColumn1) over ( partition by someColumn2 order by
> someColumn13 asc nulls first) as someName  from someTable
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Tue, Jul 25, 2017 at 9:15 AM, kant kodali  wrote:
>
>> How do I Specify windowInterval and slideInteval using raw sql string?
>>
>> On Tue, Jul 25, 2017 at 8:52 AM, Keith Chapman 
>> wrote:
>>
>>> You could issue a raw sql query to spark, there is no particular
>>> advantage or disadvantage of doing so. Spark would build a logical plan
>>> from the raw sql (or DSL) and optimize on that. Ideally you would end up
>>> with the same physical plan, irrespective of it been written in raw sql /
>>> DSL.
>>>
>>> Regards,
>>> Keith.
>>>
>>> http://keith-chapman.com
>>>
>>> On Tue, Jul 25, 2017 at 12:50 AM, kant kodali 
>>> wrote:
>>>
 HI All,

 I just want to run some spark structured streaming Job similar to this

 DS.filter(col("name").equalTo("john"))
 .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 
 hours"), df1.col("hourlyPay"))
 .agg(sum("hourlyPay").as("total"));


 I am wondering if I can express the above query in raw sql string?

 If so how would that look like and what are some of the disadvantages of 
 using raw sql query vs spark DSL?


 Thanks!


>>>
>>
>


Re: to_json not working with selectExpr

2017-07-16 Thread Burak Yavuz
Hi Matthew,

Which Spark version are you using? The expression `to_json` was added in
2.2 with this commit:
https://github.com/apache/spark/commit/0cdcf9114527a2c359c25e46fd6556b3855bfb28

Best,
Burak

On Sun, Jul 16, 2017 at 6:24 PM, Matthew cao  wrote:

> Hi all,
> I just read the databricks blog here: https://docs.databricks.
> com/_static/notebooks/complex-nested-structured.html
>
> When I try to follow the example about the to_json and selectExpr part, it
> gave error: “org.apache.spark.sql.AnalysisException: Undefined function:
> 'to_json'. This function is neither a registered temporary function nor a
> permanent function registered in the database 'default'.; line 1 pos 0”.
> Also this error show in the original databricks notebook. I know that
> to_json function works great with select. Do I miss something when using
> selectExpr? THX.
>
> Best,
> Matthew
>
>


Re: Querying on Deeply Nested JSON Structures

2017-07-16 Thread Burak Yavuz
Have you checked out this blog post?
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

Shows tools and tips on how to work with nested data. You can access data
through `field1.field2.field3` and such with JSON.

Best,
Burak

On Sat, Jul 15, 2017 at 10:45 AM, Matt Deaver  wrote:

> I would love to be told otherwise, but I believe your options are to
> either 1) use the explode function or 2) pre-process the data so you don't
> have to explode it.
>
> On Jul 15, 2017 11:41 AM, "Patrick"  wrote:
>
>> Hi,
>>
>> We need to query deeply nested Json structure. However query is on a
>> single field at a nested level such as mean, median, mode.
>>
>> I am aware of the sql explode function.
>>
>> df = df_nested.withColumn('exploded', explode(top))
>>
>> But this is too slow.
>>
>> Is there any other strategy that could give us the best performance in 
>> querying nested json in Spark Dataset.
>>
>>
>> Thanks
>>
>>
>>


Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

2017-06-19 Thread Burak Yavuz
Hi Kaniska,

In order to use append mode with aggregations, you need to set an event
time watermark (using `withWatermark`). Otherwise, Spark doesn't know when
to output an aggregation result as "final".

Best,
Burak

On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal 
wrote:

> Hi,
>
> My goal is to ~
> (1) either chain streaming aggregations in a single query OR
> (2) run multiple streaming aggregations and save data in some meaningful
> format to execute low latency / failsafe OLAP queries
>
> So my first choice is parquet format , but I failed to make it work !
>
> I am using spark-streaming_2.11-2.1.1
>
> I am facing the following error -
> org.apache.spark.sql.AnalysisException: Append output mode not supported
> when there are streaming aggregations on streaming DataFrames/DataSets;
>
> - for the following syntax
>
>  StreamingQuery streamingQry = tagBasicAgg.writeStream()
>
>   .format("parquet")
>
>   .trigger(ProcessingTime.create("10 seconds"))
>
>   .queryName("tagAggSummary")
>
>   .outputMode("append")
>
>   .option("checkpointLocation", "/tmp/summary/checkpoints/")
>
>   .option("path", "/data/summary/tags/")
>
>   .start();
> But, parquet doesn't support 'complete' outputMode.
>
> So is parquet supported only for batch queries , NOT for streaming queries
> ?
>
> - note that console outputmode working fine !
>
> Any help will be much appreciated.
>
> Thanks
> Kaniska
>
>


Re: Spark SQL within a DStream map function

2017-06-16 Thread Burak Yavuz
Do you really need to create a DStream from the original messaging queue?
Can't you just read them in a while loop or something on the driver?

On Fri, Jun 16, 2017 at 1:01 PM, Mike Hugo  wrote:

> Hello,
>
> I have a web application that publishes JSON messages on to a messaging
> queue that contain metadata and a link to a CSV document on S3.  I'd like
> to iterate over these JSON messages, and for each one pull the CSV document
> into spark SQL to transform it (based on the metadata in the JSON message)
> and output the results to a search index.  Each file on S3 has different
> headers, potentially different delimiters, and differing numbers of rows.
>
> Basically what I'm trying to do is something like this:
>
> JavaDStream parsedMetadataAndRows =
> queueStream.map(new Function() {
> @Override
> ParsedDocument call(String metadata) throws Exception {
> Map gson = new Gson().fromJson(metadata, Map.class)
>
> // get metadata from gson
> String s3Url = gson.url
> String delimiter = gson.delimiter
> // etc...
>
> // read s3Url
> Dataset dataFrame = sqlContext.read()
> .format("com.databricks.spark.csv")
> .option("delimiter", delimiter)
> .option("header", true)
> .option("inferSchema", true)
> .load(url)
>
> // process document,
> ParsedDocument docPlusRows = //...
>
> return docPlusRows
> })
>
> JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows,
> "index/type" // ...
>
>
> But it appears I cannot get access to the sqlContext when I run this on
> the spark cluster because that code is executing in the executor not in the
> driver.
>
> Is there a way I can access or create a SqlContext to be able to pull the
> file down from S3 in my map function?  Or do you have any recommendations
> as to how I could set up a streaming job in a different way that would
> allow me to accept metadata on the stream of records coming in and pull
> each file down from s3 for processing?
>
> Thanks in advance for your help!
>
> Mike
>


Re: Structured Streaming from Parquet

2017-05-25 Thread Burak Yavuz
Hi Paul,

>From what you're describing, it seems that stream1 is possibly generating
tons of small files and stream2 is OOMing because it tries to maintain an
in-memory list of files. Some notes/questions:

 1. Parquet files are splittable, therefore having large parquet files
shouldn't be a problem. The larger a parquet file is, the longer the write
process will take, but the read path shouldn't be adversely affected.
 2. How many partitions are you writing out to?
 3. In order to reduce the number of files, you may call:
`repartition(partitionColumns).writeStream.partitionBy(partitionColumns)`
so that every trigger, you output only 1 file per partition. After some
time, you may want to compact files if you don't partition by date.

Best,
Burak



On Thu, May 25, 2017 at 7:13 AM, Paul Corley 
wrote:

> I have a Spark Structured Streaming process that is implemented in 2
> separate streaming apps.
>
>
>
> First App reads .gz, which range in size from 1GB to 9GB compressed, files
> in from s3 filters out invalid records and repartitions the data and
> outputs to parquet on s3 partitioned the same as the stream is partitioned.
> This process produces thousands of files which other processes consume.
> The thought on this approach was to:
>
> 1)   Break the file down to smaller more easily consumed sizes
>
> 2)   Allow a more parallelism in the processes that consume the data.
>
> 3)   Allow multiple downstream processes to consume data that has
> already
>
> a.   Had bad records filtered out
>
> b.   Not have to fully read in such large files
>
>
>
> Second application reads in the files produced by the first app.  This
> process then reformats the data from a row that is:
>
>
>
> 12NDSIN|20170101:123313, 5467;20170115:987
>
>
>
> into:
>
> 12NDSIN, 20170101, 123313
>
> 12NDSIN, 20170101, 5467
>
> 12NDSIN, 20170115, 987
>
>
>
> App 1 runs no problems and churns through files in its source directory on
> s3.  Total process time for a file is < 10min.  App2 is the one having
> issues.
>
>
>
> The source is defined as
>
> *val *rawReader = sparkSession
>   .readStream
>   .option(*"latestFirst"*, *"true"*)
>   .option(*"maxFilesPerTrigger"*, batchSize)
>   .schema(rawSchema)
>   .parquet(config.getString(*"aws.s3.sourcepath"*))   ç===Line85
>
>
>
> output is defined as
>
> *val *query = output
>   .writeStream
>   .queryName(*"bk"*)
>   .format(*"parquet"*)
>   .partitionBy(*"expireDate"*)
>   .trigger(*ProcessingTime*(*"10 seconds"*))
>   .option(*"checkpointLocation"*,*config*.getString(
> *"spark.app.checkpoint_dir"*) + *"/bk"*)
>   .option(*"path"*, *config*.getString(*"spark.app.s3.output"*))
>   .start()
>   .awaitTermination()
>
>
>
> If files exist from app 1 app 2 enters a cycle of just cycling through parquet
> at ProcessFromSource.scala:85
> 
>   3999/3999
>
>
>
> If there are a few files output from app1 eventually it will enter the
> stage where it actually processes the data and begins to output, but the
> more files produced by app1 the longer it takes if it ever completes these
> steps.  With an extremely large number of files the app eventually throws a
> java OOM error. Additionally each cycle through this step takes
> successively longer.
>
> Hopefully someone can lend some insight as to what is actually taking
> place in this step and how to alleviate it
>
>
>
>
>
>
>
> Thanks,
>
>
>
> *Paul Corley* | Principle Data Engineer
>
>


Re: couple naive questions on Spark Structured Streaming

2017-05-22 Thread Burak Yavuz
Hi Kant,

>
>
> 1. Can we use Spark Structured Streaming for stateless transformations
> just like we would do with DStreams or Spark Structured Streaming is only
> meant for stateful computations?
>

Of course you can do stateless transformations. Any map, filter, select,
type of transformation is stateless. Aggregations are generally stateful.
You could also perform arbitrary stateless aggregations with "flatMapGroups
"
or make them stateful with "flatMapGroupsWithState

".



> 2. When we use groupBy and Window operations for event time processing and
> specify a watermark does this mean the timestamp field in each message is
> compared to the processing time of that machine/node and discard that
> events that are late than the specified threshold? If we don't specify a
> watermark I am assuming the processing time wont come into the picture. is
> that right? Just trying to understand the interplay between processing time
> and even time when we do even time processing.
>
> Watermarks are tracked with respect to the event time of your data, not
the processing time of the machine. Please take a look at the blog below
for more details
https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html

Best,
Burak


Re: Why does dataset.union fails but dataset.rdd.union execute correctly?

2017-05-08 Thread Burak Yavuz
Yes, unfortunately. This should actually be fixed, and the union's schema
should have the less restrictive of the DataFrames.

On Mon, May 8, 2017 at 12:46 PM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> HI Burak,
> By nullability you mean that if I have the exactly the same schema, but
> one side support null and the other doesn't, this exception (in union
> dataset) will be thrown?
>
>
>
> 2017-05-08 16:41 GMT-03:00 Burak Yavuz <brk...@gmail.com>:
>
>> I also want to add that generally these may be caused by the
>> `nullability` field in the schema.
>>
>> On Mon, May 8, 2017 at 12:25 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> This is because RDD.union doesn't check the schema, so you won't see the
>>> problem unless you run RDD and hit the incompatible column problem. For
>>> RDD, You may not see any error if you don't use the incompatible column.
>>>
>>> Dataset.union requires compatible schema. You can print ds.schema and
>>> ds1.schema and check if they are same.
>>>
>>> On Mon, May 8, 2017 at 11:07 AM, Dirceu Semighini Filho <
>>> dirceu.semigh...@gmail.com> wrote:
>>>
>>>> Hello,
>>>> I've a very complex case class structure, with a lot of fields.
>>>> When I try to union two datasets of this class, it doesn't work with
>>>> the following error :
>>>> ds.union(ds1)
>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>> Union can only be performed on tables with the compatible column types
>>>>
>>>> But when use it's rdd, the union goes right:
>>>> ds.rdd.union(ds1.rdd)
>>>> res8: org.apache.spark.rdd.RDD[
>>>>
>>>> Is there any reason for this to happen (besides a bug ;) )
>>>>
>>>>
>>>>
>>>
>>
>


Re: Why does dataset.union fails but dataset.rdd.union execute correctly?

2017-05-08 Thread Burak Yavuz
I also want to add that generally these may be caused by the `nullability`
field in the schema.

On Mon, May 8, 2017 at 12:25 PM, Shixiong(Ryan) Zhu  wrote:

> This is because RDD.union doesn't check the schema, so you won't see the
> problem unless you run RDD and hit the incompatible column problem. For
> RDD, You may not see any error if you don't use the incompatible column.
>
> Dataset.union requires compatible schema. You can print ds.schema and
> ds1.schema and check if they are same.
>
> On Mon, May 8, 2017 at 11:07 AM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hello,
>> I've a very complex case class structure, with a lot of fields.
>> When I try to union two datasets of this class, it doesn't work with the
>> following error :
>> ds.union(ds1)
>> Exception in thread "main" org.apache.spark.sql.AnalysisException: Union
>> can only be performed on tables with the compatible column types
>>
>> But when use it's rdd, the union goes right:
>> ds.rdd.union(ds1.rdd)
>> res8: org.apache.spark.rdd.RDD[
>>
>> Is there any reason for this to happen (besides a bug ;) )
>>
>>
>>
>


Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Burak Yavuz
Hi Everett,

IIRC we added unionAll in Spark 2.0 which is the same implementation as rdd
union. The union in DataFrames with Spark 2.0 does dedeuplication, and
that's why you should be seeing the slowdown.

Best,
Burak

On Thu, Mar 16, 2017 at 4:14 PM, Everett Anderson 
wrote:

> Looks like the Dataset version of union may also fail with the following
> on larger data sets, which again seems like it might be drawing everything
> into the driver for some reason --
>
> 7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
> 5760, ip-10-8-52-198.us-west-2.compute.internal): 
> java.lang.IllegalArgumentException:
> bound must be positive
> at java.util.Random.nextInt(Random.java:388)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> confChanged(LocalDirAllocator.java:305)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> getLocalPathForWrite(LocalDirAllocator.java:344)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> createTmpFileForWrite(LocalDirAllocator.java:416)
> at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(
> LocalDirAllocator.java:198)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.(
> S3AOutputStream.java:87)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
> at org.apache.parquet.hadoop.ParquetFileWriter.(
> ParquetFileWriter.java:176)
> at org.apache.parquet.hadoop.ParquetFileWriter.(
> ParquetFileWriter.java:160)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(
> ParquetOutputFormat.java:289)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(
> ParquetOutputFormat.java:262)
> at org.apache.spark.sql.execution.datasources.parquet.
> ParquetOutputWriter.(ParquetFileFormat.scala:562)
> at org.apache.spark.sql.execution.datasources.parquet.
> ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
> at org.apache.spark.sql.execution.datasources.BaseWriterContainer.
> newOutputWriter(WriterContainer.scala:131)
> at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.
> writeRows(WriterContainer.scala:247)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$
> apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$
> apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson 
> wrote:
>
>> Hi,
>>
>> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of
>> tables together and save as Parquet to S3, but it seems to take a long
>> time. We're using the S3A FileSystem implementation under the covers, too,
>> if that helps.
>>
>> Watching the Spark UI, the executors all eventually stop (we're using
>> dynamic allocation) but under the SQL tab we can see a "save at
>> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is
>> still running of course, but it may take tens of minutes to finish. It
>> makes me wonder if our data all being collected through the driver.
>>
>> If we instead convert the Datasets to RDDs and call SparkContext.union()
>> it works quickly.
>>
>> Anyone know if this is a known issue?
>>
>>
>


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-06 Thread Burak Yavuz
I presume you may be able to implement a custom sink and use
df.saveAsTable. The problem is that you will have to handle idempotence /
garbage collection yourself, in case your job fails while writing, etc.

On Mon, Feb 6, 2017 at 5:53 PM, Egor Pahomov <pahomov.e...@gmail.com> wrote:

> I have stream of files on HDFS with JSON events. I need to convert it to
> pq in realtime, process some fields and store in simple Hive table so
> people can query it. People even might want to query it with Impala, so
> it's important, that it would be real Hive metastore based table. How can I
> do that?
>
> 2017-02-06 14:25 GMT-08:00 Burak Yavuz <brk...@gmail.com>:
>
>> Hi Egor,
>>
>> Structured Streaming handles all of its metadata itself, which files are
>> actually valid, etc. You may use the "create table" syntax in SQL to treat
>> it like a hive table, but it will handle all partitioning information in
>> its own metadata log. Is there a specific reason that you want to store the
>> information in the Hive Metastore?
>>
>> Best,
>> Burak
>>
>> On Mon, Feb 6, 2017 at 11:39 AM, Egor Pahomov <pahomov.e...@gmail.com>
>> wrote:
>>
>>> Hi, I'm thinking of using Structured Streaming instead of old streaming,
>>> but I need to be able to save results to Hive table. Documentation for file
>>> sink says(http://spark.apache.org/docs/latest/structured-streamin
>>> g-programming-guide.html#output-sinks): "Supports writes to partitioned
>>> tables. ". But being able to write to partitioned directories is not
>>> enough to write to the table: someone needs to write to Hive metastore. How
>>> can I use Structured Streaming and write to Hive table?
>>>
>>> --
>>>
>>>
>>> *Sincerely yoursEgor Pakhomov*
>>>
>>
>>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-06 Thread Burak Yavuz
Hi Egor,

Structured Streaming handles all of its metadata itself, which files are
actually valid, etc. You may use the "create table" syntax in SQL to treat
it like a hive table, but it will handle all partitioning information in
its own metadata log. Is there a specific reason that you want to store the
information in the Hive Metastore?

Best,
Burak

On Mon, Feb 6, 2017 at 11:39 AM, Egor Pahomov 
wrote:

> Hi, I'm thinking of using Structured Streaming instead of old streaming,
> but I need to be able to save results to Hive table. Documentation for file
> sink says(http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#output-sinks): "Supports writes to
> partitioned tables. ". But being able to write to partitioned directories
> is not enough to write to the table: someone needs to write to Hive
> metastore. How can I use Structured Streaming and write to Hive table?
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>


Re: eager? in dataframe's checkpoint

2017-01-31 Thread Burak Yavuz
Hi Koert,

When eager is true, we return you a new DataFrame that depends on the files
written out to the checkpoint directory.
All previous operations on the checkpointed DataFrame are gone forever. You
basically start fresh. AFAIK, when eager is true, the method will not
return until the DataFrame is completely checkpointed. If you look at the
RDD.checkpoint implementation, the checkpoint location is updated
synchronously therefore during the count, `isCheckpointed` will be true.

Best,
Burak

On Tue, Jan 31, 2017 at 12:52 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i understand that checkpoint cuts the lineage, but i am not fully sure i
> understand the role of eager.
>
> eager simply seems to materialize the rdd early with a count, right after
> the rdd has been checkpointed. but why is that useful? rdd.checkpoint is
> asynchronous, so when the rdd.count happens most likely rdd.isCheckpointed
> will be false, and the count will be on the rdd before it was checkpointed.
> what is the benefit of that?
>
>
> On Thu, Jan 26, 2017 at 11:19 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hi,
>>
>> One of the goals of checkpointing is to cut the RDD lineage. Otherwise
>> you run into StackOverflowExceptions. If you eagerly checkpoint, you
>> basically cut the lineage there, and the next operations all depend on the
>> checkpointed DataFrame. If you don't checkpoint, you continue to build the
>> lineage, therefore while that lineage is being resolved, you may hit the
>> StackOverflowException.
>>
>> HTH,
>> Burak
>>
>> On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin <j...@jgp.net>
>> wrote:
>>
>>> Hey Sparkers,
>>>
>>> Trying to understand the Dataframe's checkpoint (*not* in the context
>>> of streaming) https://spark.apache.org/docs/latest/api/java/org
>>> /apache/spark/sql/Dataset.html#checkpoint(boolean)
>>>
>>> What is the goal of the *eager* flag?
>>>
>>> Thanks!
>>>
>>> jg
>>>
>>
>>
>


Re: eager? in dataframe's checkpoint

2017-01-26 Thread Burak Yavuz
Hi,

One of the goals of checkpointing is to cut the RDD lineage. Otherwise you
run into StackOverflowExceptions. If you eagerly checkpoint, you basically
cut the lineage there, and the next operations all depend on the
checkpointed DataFrame. If you don't checkpoint, you continue to build the
lineage, therefore while that lineage is being resolved, you may hit the
StackOverflowException.

HTH,
Burak

On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin  wrote:

> Hey Sparkers,
>
> Trying to understand the Dataframe's checkpoint (*not* in the context of
> streaming) https://spark.apache.org/docs/latest/api/
> java/org/apache/spark/sql/Dataset.html#checkpoint(boolean)
>
> What is the goal of the *eager* flag?
>
> Thanks!
>
> jg
>


Re: Java heap error during matrix multiplication

2017-01-26 Thread Burak Yavuz
Hi,

Have you tried creating more column blocks?

BlockMatrix matrix = cmatrix.toBlockMatrix(100, 100);

for example.


Is your data randomly spread out, or do you generally have clusters of
data points together?


On Wed, Jan 25, 2017 at 4:23 AM, Petr Shestov  wrote:

> Hi all!
>
> I'm using Spark 2.0.1 with two workers (one executor each) with 20Gb each.
> And run following code:
>
> JavaRDD entries = ...; // filing the dataCoordinateMatrix 
> cmatrix = new CoordinateMatrix(entries.rdd());BlockMatrix matrix = 
> cmatrix.toBlockMatrix(100, 1000);BlockMatrix cooc = 
> matrix.transpose().multiply(matrix);
>
> My matrix is approx 8 000 000 x 3000, but only 10 000 000 cells have
> meaningful value. During multiplication I always get:
>
> 17/01/24 08:03:10 WARN TaskMemoryManager: leak 1322.6 MB memory from 
> org.apache.spark.util.collection.ExternalAppendOnlyMap@649e701917/01/24 
> 08:03:10 ERROR Executor: Exception in task 1.0 in stage 57.0 (TID 83664)
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.mllib.linalg.DenseMatrix$.zeros(Matrices.scala:453)
> at 
> org.apache.spark.mllib.linalg.Matrix$class.multiply(Matrices.scala:101)
> at 
> org.apache.spark.mllib.linalg.SparseMatrix.multiply(Matrices.scala:565)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:483)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:480)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:480)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:479)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at 
> org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at 
> org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at 
> org.apache.spark.util.collection.CompactBuffer.flatMap(CompactBuffer.scala:30)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:479)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:478)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> Now I'm even trying to use only one core per executor. What can be the
> problem? And how can I debug it and find root cause? What could I miss in
> spark configuration?
>
> I've already tried increasing spark.default.parallelism and decreasing
> blocks size for BlockMatrix.
>
> Thanks.
>


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread Burak Yavuz
Yes you may. Depends on if you want exact values or if you're okay with
approximations. With Big Data, generally you would be okay with
approximations. Try both out, see what scales/works with your dataset.
Maybe you may handle the second implementation.

On Wed, Jan 25, 2017 at 12:23 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Thanks Burak. But with BloomFilter, won't I be getting a false poisitve?
>
> On Wed, Jan 25, 2017 at 11:28 AM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> I noticed that 1 wouldn't be a problem, because you'll save the
>> BloomFilter in the state.
>>
>> For 2, you would keep a Map of UUID's to the timestamp of when you saw
>> them. If the UUID exists in the map, then you wouldn't increase the count.
>> If the timestamp of a UUID expires, you would remove it from the map. The
>> reason we remove from the map is to keep a bounded amount of space. It'll
>> probably take a lot more space than the BloomFilter though depending on
>> your data volume.
>>
>> On Wed, Jan 25, 2017 at 11:24 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> In the previous email you gave me 2 solutions
>>> 1. Bloom filter --> problem in repopulating the bloom filter on restarts
>>> 2. keeping the state of the unique ids
>>>
>>> Please elaborate on 2.
>>>
>>>
>>>
>>> On Wed, Jan 25, 2017 at 10:53 AM, Burak Yavuz <brk...@gmail.com> wrote:
>>>
>>>> I don't have any sample code, but on a high level:
>>>>
>>>> My state would be: (Long, BloomFilter[UUID])
>>>> In the update function, my value will be the UUID of the record, since
>>>> the word itself is the key.
>>>> I'll ask my BloomFilter if I've seen this UUID before. If not increase
>>>> count, also add to Filter.
>>>>
>>>> Does that make sense?
>>>>
>>>>
>>>> On Wed, Jan 25, 2017 at 9:28 AM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>> Hi Burak,
>>>>> Thanks for the response. Can you please elaborate on your idea of
>>>>> storing the state of the unique ids.
>>>>> Do you have any sample code or links I can refer to.
>>>>> Thanks
>>>>>
>>>>> On Wed, Jan 25, 2017 at 9:13 AM, Burak Yavuz <brk...@gmail.com> wrote:
>>>>>
>>>>>> Off the top of my head... (Each may have it's own issues)
>>>>>>
>>>>>> If upstream you add a uniqueId to all your records, then you may use
>>>>>> a BloomFilter to approximate if you've seen a row before.
>>>>>> The problem I can see with that approach is how to repopulate the
>>>>>> bloom filter on restarts.
>>>>>>
>>>>>> If you are certain that you're not going to reprocess some data after
>>>>>> a certain time, i.e. there is no way I'm going to get the same data in 2
>>>>>> hours, it may only happen in the last 2 hours, then you may also keep the
>>>>>> state of uniqueId's as well, and then age them out after a certain time.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Burak
>>>>>>
>>>>>> On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande <
>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>
>>>>>>> Please share your thoughts.
>>>>>>>
>>>>>>> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande <
>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> My streaming application stores lot of aggregations using
>>>>>>>>> mapWithState.
>>>>>>>>>
>>>>>>>>> I want to know what are all the possible ways I can make it
>>>>>>>>> idempotent.
>>>>>>>>>
>>>>>>>>> Please share your views.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> In a Wordcount application which  stores the count of all the
>>>>>>>>>> words input so far using mapWithState.  How do I make sure my counts 
>>>>>>>>>> are
>>>>>>>>>> not messed up if I happen to read a line more than once?
>>>>>>>>>>
>>>>>>>>>> Appreciate your response.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread Burak Yavuz
I noticed that 1 wouldn't be a problem, because you'll save the BloomFilter
in the state.

For 2, you would keep a Map of UUID's to the timestamp of when you saw
them. If the UUID exists in the map, then you wouldn't increase the count.
If the timestamp of a UUID expires, you would remove it from the map. The
reason we remove from the map is to keep a bounded amount of space. It'll
probably take a lot more space than the BloomFilter though depending on
your data volume.

On Wed, Jan 25, 2017 at 11:24 AM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> In the previous email you gave me 2 solutions
> 1. Bloom filter --> problem in repopulating the bloom filter on restarts
> 2. keeping the state of the unique ids
>
> Please elaborate on 2.
>
>
>
> On Wed, Jan 25, 2017 at 10:53 AM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> I don't have any sample code, but on a high level:
>>
>> My state would be: (Long, BloomFilter[UUID])
>> In the update function, my value will be the UUID of the record, since
>> the word itself is the key.
>> I'll ask my BloomFilter if I've seen this UUID before. If not increase
>> count, also add to Filter.
>>
>> Does that make sense?
>>
>>
>> On Wed, Jan 25, 2017 at 9:28 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Hi Burak,
>>> Thanks for the response. Can you please elaborate on your idea of
>>> storing the state of the unique ids.
>>> Do you have any sample code or links I can refer to.
>>> Thanks
>>>
>>> On Wed, Jan 25, 2017 at 9:13 AM, Burak Yavuz <brk...@gmail.com> wrote:
>>>
>>>> Off the top of my head... (Each may have it's own issues)
>>>>
>>>> If upstream you add a uniqueId to all your records, then you may use a
>>>> BloomFilter to approximate if you've seen a row before.
>>>> The problem I can see with that approach is how to repopulate the bloom
>>>> filter on restarts.
>>>>
>>>> If you are certain that you're not going to reprocess some data after a
>>>> certain time, i.e. there is no way I'm going to get the same data in 2
>>>> hours, it may only happen in the last 2 hours, then you may also keep the
>>>> state of uniqueId's as well, and then age them out after a certain time.
>>>>
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>> On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>> Please share your thoughts.
>>>>>
>>>>> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>
>>>>>>> My streaming application stores lot of aggregations using
>>>>>>> mapWithState.
>>>>>>>
>>>>>>> I want to know what are all the possible ways I can make it
>>>>>>> idempotent.
>>>>>>>
>>>>>>> Please share your views.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>
>>>>>>>> In a Wordcount application which  stores the count of all the words
>>>>>>>> input so far using mapWithState.  How do I make sure my counts are not
>>>>>>>> messed up if I happen to read a line more than once?
>>>>>>>>
>>>>>>>> Appreciate your response.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread Burak Yavuz
I don't have any sample code, but on a high level:

My state would be: (Long, BloomFilter[UUID])
In the update function, my value will be the UUID of the record, since the
word itself is the key.
I'll ask my BloomFilter if I've seen this UUID before. If not increase
count, also add to Filter.

Does that make sense?


On Wed, Jan 25, 2017 at 9:28 AM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Hi Burak,
> Thanks for the response. Can you please elaborate on your idea of storing
> the state of the unique ids.
> Do you have any sample code or links I can refer to.
> Thanks
>
> On Wed, Jan 25, 2017 at 9:13 AM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Off the top of my head... (Each may have it's own issues)
>>
>> If upstream you add a uniqueId to all your records, then you may use a
>> BloomFilter to approximate if you've seen a row before.
>> The problem I can see with that approach is how to repopulate the bloom
>> filter on restarts.
>>
>> If you are certain that you're not going to reprocess some data after a
>> certain time, i.e. there is no way I'm going to get the same data in 2
>> hours, it may only happen in the last 2 hours, then you may also keep the
>> state of uniqueId's as well, and then age them out after a certain time.
>>
>>
>> Best,
>> Burak
>>
>> On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Please share your thoughts.
>>>
>>> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>> My streaming application stores lot of aggregations using
>>>>> mapWithState.
>>>>>
>>>>> I want to know what are all the possible ways I can make it
>>>>> idempotent.
>>>>>
>>>>> Please share your views.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>> In a Wordcount application which  stores the count of all the words
>>>>>> input so far using mapWithState.  How do I make sure my counts are not
>>>>>> messed up if I happen to read a line more than once?
>>>>>>
>>>>>> Appreciate your response.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread Burak Yavuz
Off the top of my head... (Each may have it's own issues)

If upstream you add a uniqueId to all your records, then you may use a
BloomFilter to approximate if you've seen a row before.
The problem I can see with that approach is how to repopulate the bloom
filter on restarts.

If you are certain that you're not going to reprocess some data after a
certain time, i.e. there is no way I'm going to get the same data in 2
hours, it may only happen in the last 2 hours, then you may also keep the
state of uniqueId's as well, and then age them out after a certain time.


Best,
Burak

On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande 
wrote:

> Please share your thoughts.
>
> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande  > wrote:
>
>>
>>
>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> My streaming application stores lot of aggregations using mapWithState.
>>>
>>> I want to know what are all the possible ways I can make it idempotent.
>>>
>>> Please share your views.
>>>
>>> Thanks
>>>
>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
 In a Wordcount application which  stores the count of all the words
 input so far using mapWithState.  How do I make sure my counts are not
 messed up if I happen to read a line more than once?

 Appreciate your response.

 Thanks

>>>
>>>
>>
>


Re: Spark Streaming - join streaming and static data

2016-12-06 Thread Burak Yavuz
Hi Daniela,

This is trivial with Structured Streaming. If your Kafka cluster is 0.10.0
or above, you may use Spark 2.0.2 to create a Streaming DataFrame from
Kafka, and then also create a DataFrame using the JDBC connection, and you
may join those. In Spark 2.1, there's support for a function called
"from_json", which should also help you easily parse your messages incoming
from Kafka.

Best,
Burak

On Tue, Dec 6, 2016 at 2:16 AM, Daniela S  wrote:

> Hi
>
> I have some questions regarding Spark Streaming.
>
> I receive a stream of JSON messages from Kafka.
> The messages consist of a timestamp and an ID.
>
> timestamp ID
> 2016-12-06 13:001
> 2016-12-06 13:405
> ...
>
> In a database I have values for each ID:
>
> ID   minute  value
> 1 0   3
> 1 1   5
> 1 2   7
> 1 3   8
> 5 0   6
> 5 1   6
> 5 2   8
> 5 3   5
> 5 4   6
>
> So I would like to join each incoming JSON message with the corresponding
> values. It should look as follows:
>
> timestamp ID   minute  value
> 2016-12-06 13:001 0   3
> 2016-12-06 13:001 1   5
> 2016-12-06 13:001 2   7
> 2016-12-06 13:001 3   8
> 2016-12-06 13:405 0   6
> 2016-12-06 13:405 1   6
> 2016-12-06 13:405 2   8
> 2016-12-06 13:405 3   5
> 2016-12-06 13:405 4   6
> ...
>
> Then I would like to add the minute values to the timestamp. I only need
> the computed timestamp and the values. So the result should look as follows:
>
> timestamp   value
> 2016-12-06 13:00  3
> 2016-12-06 13:01  5
> 2016-12-06 13:02  7
> 2016-12-06 13:03  8
> 2016-12-06 13:40  6
> 2016-12-06 13:41  6
> 2016-12-06 13:42  8
> 2016-12-06 13:43  5
> 2016-12-06 13:44  6
> ...
>
> Is this a possible use case for Spark Streaming? I thought I could join
> the streaming data with the static data but I am not sure how to add the
> minute values to the timestamp. Is this possible with Spark Streaming?
>
> Thank you in advance.
>
> Best regards,
> Daniela
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: How to cause a stage to fail (using spark-shell)?

2016-06-18 Thread Burak Yavuz
Hi Jacek,

Can't you simply have a mapPartitions task throw an exception or something?
Are you trying to do something more esoteric?

Best,
Burak

On Sat, Jun 18, 2016 at 5:35 AM, Jacek Laskowski  wrote:

> Hi,
>
> Following up on this question, is a stage considered failed only when
> there is a FetchFailed exception? Can I have a failed stage with only
> a single-stage job?
>
> Appreciate any help on this...(as my family doesn't like me spending
> the weekend with Spark :))
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Jun 18, 2016 at 11:53 AM, Jacek Laskowski  wrote:
> > Hi,
> >
> > I'm trying to see some stats about failing stages in web UI and want
> > to "create" few failed stages. Is this possible using spark-shell at
> > all? Which setup of Spark/spark-shell would allow for such a scenario.
> >
> > I could write a Scala code if that's the only way to have failing stages.
> >
> > Please guide. Thanks.
> >
> > /me on to reviewing the Spark code...
> >
> > Pozdrawiam,
> > Jacek Laskowski
> > 
> > https://medium.com/@jaceklaskowski/
> > Mastering Apache Spark http://bit.ly/mastering-apache-spark
> > Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Any NLP lib could be used on spark?

2016-04-19 Thread Burak Yavuz
A quick search on spark-packages returns:
http://spark-packages.org/package/databricks/spark-corenlp.

You may need to build it locally and add it to your session by --jars.

On Tue, Apr 19, 2016 at 10:47 AM, Gavin Yue  wrote:

> Hey,
>
> Want to try the NLP on the spark. Could anyone recommend any easy to run
> NLP open source lib on spark?
>
> Also is there any recommended semantic network?
>
> Thanks a lot.
>


Re: Calculation of histogram bins and frequency in Apache spark 1.6

2016-02-23 Thread Burak Yavuz
You could use the Bucketizer transformer in Spark ML.

Best,
Burak

On Tue, Feb 23, 2016 at 9:13 AM, Arunkumar Pillai 
wrote:

> Hi
> Is there any predefined method to calculate histogram bins and frequency
> in spark. Currently I take range and find bins then count frequency using
> SQL query.
>
> Is there any better way
>


Re: Using SPARK packages in Spark Cluster

2016-02-12 Thread Burak Yavuz
Hello Gourav,

The packages need to be loaded BEFORE you start the JVM, therefore you
won't be able to add packages dynamically in code. You should use the
--packages with pyspark before you start your application.
One option is to add a `conf` that will load some packages if you are
constantly going to use them.

Best,
Burak



On Fri, Feb 12, 2016 at 4:22 AM, Gourav Sengupta 
wrote:

> Hi,
>
> I am creating sparkcontext in a SPARK standalone cluster as mentioned
> here: http://spark.apache.org/docs/latest/spark-standalone.html using the
> following code:
>
>
> --
> sc.stop()
> conf = SparkConf().set( 'spark.driver.allowMultipleContexts' , False) \
>   .setMaster("spark://hostname:7077") \
>   .set('spark.shuffle.service.enabled', True) \
>   .set('spark.dynamicAllocation.enabled','true') \
>   .set('spark.executor.memory','20g') \
>   .set('spark.driver.memory', '4g') \
>
> .set('spark.default.parallelism',(multiprocessing.cpu_count() -1 ))
> conf.getAll()
> sc = SparkContext(conf = conf)
>
> -(we should definitely be able to optimise the configuration but that
> is not the point here) ---
>
> I am not able to use packages, a list of which is mentioned here
> http://spark-packages.org, using this method.
>
> Where as if I use the standard "pyspark --packages" option then the
> packages load just fine.
>
> I will be grateful if someone could kindly let me know how to load
> packages when starting a cluster as mentioned above.
>
>
> Regards,
> Gourav Sengupta
>


Re: Redirect Spark Logs to Kafka

2016-02-01 Thread Burak Yavuz
You can use the KafkaLog4jAppender (
https://github.com/apache/kafka/blob/trunk/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
).

Best,
Burak

On Mon, Feb 1, 2016 at 12:20 PM, Ashish Soni  wrote:

> Hi All ,
>
> Please let me know how we can redirect spark logging files or tell spark
> to log to kafka queue instead of files ..
>
> Ashish
>


Re: Guidelines for writing SPARK packages

2016-02-01 Thread Burak Yavuz
Thanks for the reply David, just wanted to fix one part of your response:


> If you
> want to register a release for your package you will also need to push
> the artifacts for your package to Maven central.
>

It is NOT necessary to push to Maven Central in order to make a release.
There are many packages out there that don't publish to Maven Central, e.g.
scripts, and pure python packages.

Praveen, I would suggest taking a look at:
 - spark-package command line tool (
https://github.com/databricks/spark-package-cmd-tool), to get you set up
 - sbt-spark-package (https://github.com/databricks/sbt-spark-package) to
help with building/publishing if you plan to use Scala in your package. You
could of course use Maven as well, but we don't have a maven plugin for
Spark Packages.

Best,
Burak


Re: Optimized way to multiply two large matrices and save output using Spark and Scala

2016-01-13 Thread Burak Yavuz
BlockMatrix.multiply is the suggested method of multiplying two large
matrices. Is there a reason that you didn't use BlockMatrices?

You can load the matrices and convert to and from RowMatrix. If it's in
sparse format (i, j, v), then you can also use the CoordinateMatrix to
load, BlockMatrix to multiply, and CoordinateMatrix to save it back again.

Thanks,
Burak

On Wed, Jan 13, 2016 at 8:16 PM, Devi P.V  wrote:

> I want to multiply two large matrices (from csv files)using Spark and
> Scala and save output.I use the following code
>
>   val rows=file1.coalesce(1,false).map(x=>{
>   val line=x.split(delimiter).map(_.toDouble)
>   Vectors.sparse(line.length,
> line.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
>
> })
>
> val rmat = new RowMatrix(rows)
>
> val dm=file2.coalesce(1,false).map(x=>{
>   val line=x.split(delimiter).map(_.toDouble)
>   Vectors.dense(line)
> })
>
> val ma = dm.map(_.toArray).take(dm.count.toInt)
> val localMat = Matrices.dense( dm.count.toInt,
>   dm.take(1)(0).size,
>
>   transpose(ma).flatten)
>
> // Multiply two matrices
> val s=rmat.multiply(localMat).rows
>
> s.map(x=>x.toArray.mkString(delimiter)).saveAsTextFile(OutputPath)
>
>   }
>
>   def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
> (for {
>   c <- m(0).indices
> } yield m.map(_(c)) ).toArray
>   }
>
> When I save file it takes more time and output file has very large in
> size.what is the optimized way to multiply two large files and save the
> output to a text file ?
>


Re: number of blocks in ALS/recommendation API

2015-12-17 Thread Burak Yavuz
Copying the first part from the scaladoc:
"
This is a blocked implementation of the ALS factorization algorithm that
groups the two sets of factors (referred to as "users" and "products") into
blocks and reduces communication by only sending one copy of each user
vector to each product block on each iteration, and only for the product
blocks that need that user's feature vector. This is achieved by
pre-computing some information about the ratings matrix to determine the
"out-links" of each user (which blocks of products it will contribute to)
and "in-link" information for each product (which of the feature vectors it
receives from each user block it will depend on). This allows us to send
only an array of feature vectors between each user block and product block,
and have the product block find the users' ratings and update the products
based on these messages.
"

Basically, the number of blocks can be tweaked to reduce shuffling, making
the application more efficient in both run time and disk space usage. For
example, if you have a high number of product ratings per user ratio (1
user rating 100s of products), you may choose a smaller product block
number so that that user's ratings get sent to a fewer number of
partitions, which would lead to less shuffling.

However, if you choose your number of blocks to be low, then you may run
into OOMEs.

Hope that helps.

Burak

On Thu, Dec 17, 2015 at 3:17 AM, Roberto Pagliari  wrote:

> What is the meaning of the ‘blocks’ input argument in mllib ALS
> implementation, and how does that relate to the number of executors and/or
> size of the input data?
>
>
> Thank you,
>
>


Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Burak Yavuz
I've noticed this happening when there was some dependency conflicts, and
it is super hard to debug.
It seems that the KinesisClientLibrary version in Spark 1.5.2 is 1.3.0, but
it is 1.2.1 in Spark 1.5.1.
I feel like that seems to be the problem...

Brian, did you verify that it works with the 1.6.0 branch?

Thanks,
Burak

On Thu, Dec 10, 2015 at 11:45 AM, Brian London 
wrote:

> Nick's symptoms sound identical to mine.  I should mention that I just
> pulled the latest version from github and it seems to be working there.  To
> reproduce:
>
>
>1. Download spark 1.5.2 from http://spark.apache.org/downloads.html
>2. build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
>clean package
>3. build/mvn -Pkinesis-asl -DskipTests clean package
>4. Then run simultaneously:
>1. bin/run-example streaming.KinesisWordCountASL [Kinesis app name]
>   [Kinesis stream name] [endpoint URL]
>   2.   bin/run-example streaming.KinesisWordProducerASL [Kinesis
>   stream name] [endpoint URL] 100 10
>
>
> On Thu, Dec 10, 2015 at 2:05 PM Jean-Baptiste Onofré 
> wrote:
>
>> Hi Nick,
>>
>> Just to be sure: don't you see some ClassCastException in the log ?
>>
>> Thanks,
>> Regards
>> JB
>>
>> On 12/10/2015 07:56 PM, Nick Pentreath wrote:
>> > Could you provide an example / test case and more detail on what issue
>> > you're facing?
>> >
>> > I've just tested a simple program reading from a dev Kinesis stream and
>> > using stream.print() to show the records, and it works under 1.5.1 but
>> > doesn't appear to be working under 1.5.2.
>> >
>> > UI for 1.5.2:
>> >
>> > Inline image 1
>> >
>> > UI for 1.5.1:
>> >
>> > Inline image 2
>> >
>> > On Thu, Dec 10, 2015 at 5:50 PM, Brian London > > > wrote:
>> >
>> > Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
>> > Kinesis ASL that ships with 1.5.2 appears to not work for me
>> > although 1.5.1 is fine. I spent some time with Amazon earlier in the
>> > week and the only thing we could do to make it work is to change the
>> > version to 1.5.1.  Can someone please attempt to reproduce before I
>> > open a JIRA issue for it?
>> >
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Burak Yavuz
I don't think the Kinesis tests specifically ran when that was merged into
1.5.2 :(
https://github.com/apache/spark/pull/8957
https://github.com/apache/spark/commit/883bd8fccf83aae7a2a847c9a6ca129fac86e6a3

AFAIK pom changes don't trigger the Kinesis tests.

Burak

On Thu, Dec 10, 2015 at 8:09 PM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> Yup also works for me on master branch as I've been testing DynamoDB
> Streams integration. In fact works with latest KCL 1.6.1 also which I was
> using.
>
> So theKCL version does seem like it could be the issue - somewhere along
> the line an exception must be getting swallowed. Though the tests should
> have picked this up? Will dig deeper.
>
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox>
>
>
> On Thu, Dec 10, 2015 at 11:07 PM, Brian London <brianmlon...@gmail.com>
> wrote:
>
>> Yes, it worked in the 1.6 branch as of commit
>> db5165246f2888537dd0f3d4c5a515875c7358ed.  That makes it much less
>> serious of an issue, although it would be nice to know what the root cause
>> is to avoid a regression.
>>
>> On Thu, Dec 10, 2015 at 4:03 PM Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> I've noticed this happening when there was some dependency conflicts,
>>> and it is super hard to debug.
>>> It seems that the KinesisClientLibrary version in Spark 1.5.2 is 1.3.0,
>>> but it is 1.2.1 in Spark 1.5.1.
>>> I feel like that seems to be the problem...
>>>
>>> Brian, did you verify that it works with the 1.6.0 branch?
>>>
>>> Thanks,
>>> Burak
>>>
>>> On Thu, Dec 10, 2015 at 11:45 AM, Brian London <brianmlon...@gmail.com>
>>> wrote:
>>>
>>>> Nick's symptoms sound identical to mine.  I should mention that I just
>>>> pulled the latest version from github and it seems to be working there.  To
>>>> reproduce:
>>>>
>>>>
>>>>1. Download spark 1.5.2 from http://spark.apache.org/downloads.html
>>>>2. build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
>>>>clean package
>>>>3. build/mvn -Pkinesis-asl -DskipTests clean package
>>>>4. Then run simultaneously:
>>>>1. bin/run-example streaming.KinesisWordCountASL [Kinesis app name]
>>>>   [Kinesis stream name] [endpoint URL]
>>>>   2.   bin/run-example streaming.KinesisWordProducerASL [Kinesis
>>>>   stream name] [endpoint URL] 100 10
>>>>
>>>>
>>>> On Thu, Dec 10, 2015 at 2:05 PM Jean-Baptiste Onofré <j...@nanthrax.net>
>>>> wrote:
>>>>
>>>>> Hi Nick,
>>>>>
>>>>> Just to be sure: don't you see some ClassCastException in the log ?
>>>>>
>>>>> Thanks,
>>>>> Regards
>>>>> JB
>>>>>
>>>>> On 12/10/2015 07:56 PM, Nick Pentreath wrote:
>>>>> > Could you provide an example / test case and more detail on what
>>>>> issue
>>>>> > you're facing?
>>>>> >
>>>>> > I've just tested a simple program reading from a dev Kinesis stream
>>>>> and
>>>>> > using stream.print() to show the records, and it works under 1.5.1
>>>>> but
>>>>> > doesn't appear to be working under 1.5.2.
>>>>> >
>>>>> > UI for 1.5.2:
>>>>> >
>>>>> > Inline image 1
>>>>> >
>>>>> > UI for 1.5.1:
>>>>> >
>>>>> > Inline image 2
>>>>> >
>>>>> > On Thu, Dec 10, 2015 at 5:50 PM, Brian London <
>>>>> brianmlon...@gmail.com
>>>>> > <mailto:brianmlon...@gmail.com>> wrote:
>>>>> >
>>>>> > Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
>>>>> > Kinesis ASL that ships with 1.5.2 appears to not work for me
>>>>> > although 1.5.1 is fine. I spent some time with Amazon earlier in
>>>>> the
>>>>> > week and the only thing we could do to make it work is to change
>>>>> the
>>>>> > version to 1.5.1.  Can someone please attempt to reproduce
>>>>> before I
>>>>> > open a JIRA issue for it?
>>>>> >
>>>>> >
>>>>>
>>>>> --
>>>>> Jean-Baptiste Onofré
>>>>> jbono...@apache.org
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>
>


Re: Spark Streaming idempotent writes to HDFS

2015-11-23 Thread Burak Yavuz
Not sure if it would be the most efficient, but maybe you can think of the
filesystem as a key value store, and write each batch to a sub-directory,
where the directory name is the batch time. If the directory already
exists, then you shouldn't write it. Then you may have a following batch
job that will coalesce files, in order to "close the day".

Burak

On Mon, Nov 23, 2015 at 8:58 PM, Michael  wrote:

> Hi all,
>
> I'm working on project with spark streaming, the goal is to process log
> files from S3 and save them on hadoop to later analyze them with
> sparkSQL.
> Everything works well except when I kill the spark application and
> restart it: it picks up from the latest processed batch and reprocesses
> it which results in duplicate data on hdfs.
>
> How can I make the writing step on hdfs idempotent ? I couldn't find any
> way to control for example the filenames of the parquet files being
> written, the idea being to include the batch time so that the same batch
> gets written always on the same path.
> I've also tried with mode("overwrite") but looks that each batch gets
> written on the same file every time.
> Any help would be greatly appreciated.
>
> Thanks,
> Michael
>
> --
>
> def save_rdd(batch_time, rdd):
> sqlContext = SQLContext(rdd.context)
> df = sqlContext.createDataFrame(rdd, log_schema)
>
> df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory)
>
> def create_ssc(checkpoint_dir, spark_master):
>
> sc = SparkContext(spark_master, app_name)
> ssc = StreamingContext(sc, batch_interval)
> ssc.checkpoint(checkpoint_dir)
>
> parsed = dstream.map(lambda line: log_parser(line))
> parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd)
>
> return ssc
>
> ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda:
> create_ssc(checkpoint_dir, spark_master)
> ssc.start()
> ssc.awaitTermination()
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: large, dense matrix multiplication

2015-11-13 Thread Burak Yavuz
Hi,

The BlockMatrix multiplication should be much more efficient on the current
master (and will be available with Spark 1.6). Could you please give that a
try if you have the chance?

Thanks,
Burak

On Fri, Nov 13, 2015 at 10:11 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> Hi Eilidh
>
> Because you are multiplying with the transpose you don't have  to
> necessarily build the right side of the matrix. I hope you see that. You
> can broadcast blocks of the indexed row matrix to itself and achieve the
> multiplication.
>
> But for similarity computation you might want to use some approach like
> locality sensitive hashing first to identify a bunch of similar customers
> and then apply cosine similarity on that narrowed down list. That would
> scale much better than matrix multiplication. You could try the following
> options for the same.
>
> https://github.com/soundcloud/cosine-lsh-join-spark
> http://spark-packages.org/package/tdebatty/spark-knn-graphs
> https://github.com/marufaytekin/lsh-spark
>
> Regards
> Sab
> Hi Sab,
>
> Thanks for your response. We’re thinking of trying a bigger cluster,
> because we just started with 2 nodes. What we really want to know is
> whether the code will scale up with larger matrices and more nodes. I’d be
> interested to hear how large a matrix multiplication you managed to do?
>
> Is there an alternative you’d recommend for calculating similarity over a
> large dataset?
>
> Thanks,
> Eilidh
>
> On 13 Nov 2015, at 09:55, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
> We have done this by blocking but without using BlockMatrix. We used our
> own blocking mechanism because BlockMatrix didn't exist in Spark 1.2. What
> is the size of your block? How much memory are you giving to the executors?
> I assume you are running on YARN, if so you would want to make sure your
> yarn executor memory overhead is set to a higher value than default.
>
> Just curious, could you also explain why you need matrix multiplication
> with transpose? Smells like similarity computation.
>
> Regards
> Sab
>
> On Thu, Nov 12, 2015 at 7:27 PM, Eilidh Troup 
> wrote:
>
>> Hi,
>>
>> I’m trying to multiply a large squarish matrix with its transpose.
>> Eventually I’d like to work with matrices of size 200,000 by 500,000, but
>> I’ve started off first with 100 by 100 which was fine, and then with 10,000
>> by 10,000 which failed with an out of memory exception.
>>
>> I used MLlib and BlockMatrix and tried various block sizes, and also
>> tried switching disk serialisation on.
>>
>> We are running on a small cluster, using a CSV file in HDFS as the input
>> data.
>>
>> Would anyone with experience of multiplying large, dense matrices in
>> spark be able to comment on what to try to make this work?
>>
>> Thanks,
>> Eilidh
>>
>>
>> --
>> The University of Edinburgh is a charitable body, registered in
>> Scotland, with registration number SC005336.
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>
>
>
> The University of Edinburgh is a charitable body, registered in
> Scotland, with registration number SC005336.
>
>


Re: Spark Packages Configuration Not Found

2015-11-11 Thread Burak Yavuz
Hi Jakob,
> As another, general question, are spark packages the go-to way of
extending spark functionality?

Definitely. There are ~150 Spark Packages out there in spark-packages.org.
I use a lot of them in every day Spark work.
The number of released packages have steadily increased rate over the last
few months.

> Since I wasn't able to find much documentation about spark packages, I
was wondering if they are still actively being developed?

I would love to work on the documentation. Some exist on spark-packages.org,
but there could be a lot more. If you have any specific questions, feel
free to submit them to me directly, and I'll incorporate them to a FAQ I'm
working on.

Regarding your initial problem: Unfortunately `spPublishLocal` is broken
due to ivy configuration mismatches between Spark and the sbt-spark-package
plugin. What you can do instead is:
```
$ sbt +spark-paperui-server/publishM2
$ spark-shell --packages ch.jodersky:spark-paperui-server_2.10:0.1-SNAPSHOT
```

Hopefully that should work for you.

Best,
Burak


On Wed, Nov 11, 2015 at 10:53 AM, Jakob Odersky  wrote:

> As another, general question, are spark packages the go-to way of
> extending spark functionality? In my specific use-case I would like to
> start spark (be it spark-shell or other) and hook into the listener API.
> Since I wasn't able to find much documentation about spark packages, I was
> wondering if they are still actively being developed?
>
> thanks,
> --Jakob
>
> On 10 November 2015 at 14:58, Jakob Odersky  wrote:
>
>> (accidental keyboard-shortcut sent the message)
>> ... spark-shell from the spark 1.5.2 binary distribution.
>> Also, running "spPublishLocal" has the same effect.
>>
>> thanks,
>> --Jakob
>>
>> On 10 November 2015 at 14:55, Jakob Odersky  wrote:
>>
>>> Hi,
>>> I ran into in error trying to run spark-shell with an external package
>>> that I built and published locally
>>> using the spark-package sbt plugin (
>>> https://github.com/databricks/sbt-spark-package).
>>>
>>> To my understanding, spark packages can be published simply as maven
>>> artifacts, yet after running "publishLocal" in my package project (
>>> https://github.com/jodersky/spark-paperui), the following command
>>>
>>>park-shell --packages
>>> ch.jodersky:spark-paperui-server_2.10:0.1-SNAPSHOT
>>>
>>> gives an error:
>>>
>>> ::
>>>
>>> ::  UNRESOLVED DEPENDENCIES ::
>>>
>>> ::
>>>
>>> :: ch.jodersky#spark-paperui-server_2.10;0.1: configuration not
>>> found in ch.jodersky#spark-paperui-server_2.10;0.1: 'default'. It was
>>> required from org.apache.spark#spark-submit-parent;1.0 default
>>>
>>> ::
>>>
>>>
>>> :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
>>> Exception in thread "main" java.lang.RuntimeException: [unresolved
>>> dependency: ch.jodersky#spark-paperui-server_2.10;0.1: configuration not
>>> found in ch.jodersky#spark-paperui-server_2.10;0.1: 'default'. It was
>>> required from org.apache.spark#spark-submit-parent;1.0 default]
>>> at
>>> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1011)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:12
>>>
>>> Do I need to include some default configuration? If so where and how
>>> should I do it? All other packages I looked at had no such thing.
>>>
>>> Btw, I am using spark-shell from a
>>>
>>>
>>
>


Re: spark-submit --packages using different resolver

2015-10-03 Thread Burak Yavuz
Hi Jerry,

The --packages feature doesn't support private repositories right now.
However, in the case of s3, maybe it might work. Could you please try using
the --repositories flag and provide the address:
`$ spark-submit --packages my:awesome:package --repositories
s3n://$aws_ak:$aws_sak@bucket/path/to/repo`

If that doesn't work, could you please file a JIRA?

Best,
Burak


On Thu, Oct 1, 2015 at 8:58 PM, Jerry Lam  wrote:

> Hi spark users and developers,
>
> I'm trying to use spark-submit --packages against private s3 repository.
> With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
> wonder how can I add this resolver into spark-submit such that --packages
> can resolve dependencies from private repo?
>
> Thank you!
>
> Jerry
>


Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?

2015-09-09 Thread Burak Yavuz
By the way, I published http://spark-packages.org/package/brkyvz/lazy-linalg
that contains many of the arithmetic operations for use in Scala. I really
would appreciate any feedback!

On Tue, Aug 25, 2015 at 11:06 AM, Kristina Rogale Plazonic <kpl...@gmail.com
> wrote:

> YES PLEASE!
>
> :)))
>
> On Tue, Aug 25, 2015 at 1:57 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hmm. I have a lot of code on the local linear algebra operations using
>> Spark's Matrix and Vector representations
>> done for https://issues.apache.org/jira/browse/SPARK-6442.
>>
>> I can make a Spark package with that code if people are interested.
>>
>> Best,
>> Burak
>>
>> On Tue, Aug 25, 2015 at 10:54 AM, Kristina Rogale Plazonic <
>> kpl...@gmail.com> wrote:
>>
>>> However I do think it's easier than it seems to write the implicits;
>>>> it doesn't involve new classes or anything. Yes it's pretty much just
>>>> what you wrote. There is a class "Vector" in Spark. This declaration
>>>> can be in an object; you don't implement your own class. (Also you can
>>>> use "toBreeze" to get Breeze vectors.)
>>>
>>>
>>> The implicit conversion with the implicit def happens for the first
>>> vector in the sum, but not the second vector (see below).
>>>
>>> At this point I give up, because I spent way too much time.  I am so
>>> disappointed.  So many times I heard "Spark makes simple things easy and
>>> complicated things possible". Well, here is the simplest thing you can
>>> imagine in linear algebra, but heck, it is not easy or intuitive.  It was
>>> easier to run a DeepLearning algo (from another library) than add two
>>> vectors.
>>>
>>> If anybody has a workaround other than implementing your own
>>> add/substract/scalarMultiply, PLEASE let me know.
>>>
>>> Here is the code and error from (freshly started) spark-shell:
>>>
>>> scala> import breeze.linalg.{DenseVector => BDV, SparseVector => BSV,
>>> Vector => BV}
>>> import breeze.linalg.{DenseVector=>BDV, SparseVector=>BSV, Vector=>BV}
>>>
>>> scala> import org.apache.spark.mllib.linalg.Vectors
>>> import org.apache.spark.mllib.linalg.Vectors
>>>
>>> scala> val v1 = Vectors.dense(1.0, 2.0, 3.0)
>>> v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]
>>>
>>> scala> import org.apache.spark.mllib.linalg.{Vector =>SparkVector}
>>> import org.apache.spark.mllib.linalg.{Vector=>SparkVector}
>>>
>>> scala> object MyUtils {
>>>  |   implicit def toBreeze(v:SparkVector) = BV(v.toArray)
>>>  | }
>>> warning: there were 1 feature warning(s); re-run with -feature for
>>> details
>>> defined module MyUtils
>>>
>>> scala> import MyUtils._
>>> import MyUtils._
>>>
>>> scala> v1:BV[Double]
>>> res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0)
>>>
>>> scala> v1 + v1
>>> :30: error: could not find implicit value for parameter op:
>>> breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That]
>>>   v1 + v1
>>>  ^
>>>
>>>
>>>
>>
>>
>


Re: Calculating Min and Max Values using Spark Transformations?

2015-08-28 Thread Burak Yavuz
Or you can just call describe() on the dataframe? In addition to min-max,
you'll also get the mean, and count of non-null and non-NA elements as well.

Burak

On Fri, Aug 28, 2015 at 10:09 AM, java8964 java8...@hotmail.com wrote:

 Or RDD.max() and RDD.min() won't work for you?

 Yong

 --
 Subject: Re: Calculating Min and Max Values using Spark Transformations?
 To: as...@wso2.com
 CC: user@spark.apache.org
 From: jfc...@us.ibm.com
 Date: Fri, 28 Aug 2015 09:28:43 -0700


 If you already loaded csv data into a dataframe, why not register it as a
 table, and use Spark SQL
 to find max/min or any other aggregates? SELECT MAX(column_name) FROM
 dftable_name ... seems natural.





*JESSE CHEN*
Big Data Performance | IBM Analytics

Office:  408 463 2296
Mobile: 408 828 9068
Email:   jfc...@us.ibm.com



 [image: Inactive hide details for ashensw ---08/28/2015 05:40:07 AM---Hi
 all, I have a dataset which consist of large number of feature]ashensw
 ---08/28/2015 05:40:07 AM---Hi all, I have a dataset which consist of large
 number of features(columns). It is

 From: ashensw as...@wso2.com
 To: user@spark.apache.org
 Date: 08/28/2015 05:40 AM
 Subject: Calculating Min and Max Values using Spark Transformations?

 --



 Hi all,

 I have a dataset which consist of large number of features(columns). It is
 in csv format. So I loaded it into a spark dataframe. Then I converted it
 into a JavaRDDRow Then using a spark transformation I converted that into
 JavaRDDString[]. Then again converted it into a JavaRDDdouble[]. So now
 I have a JavaRDDdouble[]. So is there any method to calculate max and min
 values of each columns in this JavaRDDdouble[] ?

 Or Is there any way to access the array if I store max and min values to a
 array inside the spark transformation class?

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Calculating-Min-and-Max-Values-using-Spark-Transformations-tp24491.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?

2015-08-25 Thread Burak Yavuz
Hmm. I have a lot of code on the local linear algebra operations using
Spark's Matrix and Vector representations
done for https://issues.apache.org/jira/browse/SPARK-6442.

I can make a Spark package with that code if people are interested.

Best,
Burak

On Tue, Aug 25, 2015 at 10:54 AM, Kristina Rogale Plazonic kpl...@gmail.com
 wrote:

 However I do think it's easier than it seems to write the implicits;
 it doesn't involve new classes or anything. Yes it's pretty much just
 what you wrote. There is a class Vector in Spark. This declaration
 can be in an object; you don't implement your own class. (Also you can
 use toBreeze to get Breeze vectors.)


 The implicit conversion with the implicit def happens for the first vector
 in the sum, but not the second vector (see below).

 At this point I give up, because I spent way too much time.  I am so
 disappointed.  So many times I heard Spark makes simple things easy and
 complicated things possible. Well, here is the simplest thing you can
 imagine in linear algebra, but heck, it is not easy or intuitive.  It was
 easier to run a DeepLearning algo (from another library) than add two
 vectors.

 If anybody has a workaround other than implementing your own
 add/substract/scalarMultiply, PLEASE let me know.

 Here is the code and error from (freshly started) spark-shell:

 scala import breeze.linalg.{DenseVector = BDV, SparseVector = BSV,
 Vector = BV}
 import breeze.linalg.{DenseVector=BDV, SparseVector=BSV, Vector=BV}

 scala import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.linalg.Vectors

 scala val v1 = Vectors.dense(1.0, 2.0, 3.0)
 v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]

 scala import org.apache.spark.mllib.linalg.{Vector =SparkVector}
 import org.apache.spark.mllib.linalg.{Vector=SparkVector}

 scala object MyUtils {
  |   implicit def toBreeze(v:SparkVector) = BV(v.toArray)
  | }
 warning: there were 1 feature warning(s); re-run with -feature for details
 defined module MyUtils

 scala import MyUtils._
 import MyUtils._

 scala v1:BV[Double]
 res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0)

 scala v1 + v1
 console:30: error: could not find implicit value for parameter op:
 breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That]
   v1 + v1
  ^





Re: Unable to catch SparkContext methods exceptions

2015-08-24 Thread Burak Yavuz
textFile is a lazy operation. It doesn't evaluate until you call an action
on it, such as .count(). Therefore, you won't catch the exception there.

Best,
Burak

On Mon, Aug 24, 2015 at 9:09 AM, Roberto Coluccio 
roberto.coluc...@gmail.com wrote:

 Hello folks,

 I'm experiencing an unexpected behaviour, that suggests me thinking about
 my missing notions on how Spark works. Let's say I have a Spark driver that
 invokes a function like:

 - in myDriver -

 val sparkContext = new SparkContext(mySparkConf)
 val inputPath = file://home/myUser/project/resources/date=*/*

 val myResult = new MyResultFunction()(sparkContext, inputPath)

 - in MyResultFunctionOverRDD --

 class MyResultFunction extends Function2[SparkContext, String,
 RDD[String]] with Serializable {
   override def apply(sparkContext: SparkContext, inputPath: String):
 RDD[String] = {
 try {
   sparkContext.textFile(inputPath, 1)
 } catch {
   case t: Throwable = {
 myLogger.error(serror: ${t.getStackTraceString}\n)
 sc.makeRDD(Seq[String]())
   }
 }
   }
 }

 What happens is that I'm *unable to catch exceptions* thrown by the
 textFile method within the try..catch clause in MyResultFunction. In
 fact, in a unit test for that function where I call it passing an invalid
 inputPath, I don't get an empty RDD as result, but the unit test exits
 (and fails) due to exception not handled.

 What am I missing here?

 Thank you.

 Best regards,
 Roberto



Re: Unable to catch SparkContext methods exceptions

2015-08-24 Thread Burak Yavuz
The laziness is hard to deal with in these situations. I would suggest
trying to handle expected cases FileNotFound, etc using other methods
before even starting a Spark job. If you really want to try.catch a
specific portion of a Spark job, one way is to just follow it with an
action. You can even call persist() before the action, so that you can
re-use the rdd.

Best,
Burak

On Mon, Aug 24, 2015 at 10:52 AM, Roberto Coluccio 
roberto.coluc...@gmail.com wrote:

 Hi Burak, thanks for your answer.

 I have a new MyResultFunction()(sparkContext, inputPath).collect in the
 unit test (so to evaluate the actual result), and there I can observe and
 catch the exception. Even considering Spark's laziness, shouldn't I catch
 the exception while occurring in the try..catch statement that encloses the
 textFile invocation?

 Best,
 Roberto


 On Mon, Aug 24, 2015 at 7:38 PM, Burak Yavuz brk...@gmail.com wrote:

 textFile is a lazy operation. It doesn't evaluate until you call an
 action on it, such as .count(). Therefore, you won't catch the exception
 there.

 Best,
 Burak

 On Mon, Aug 24, 2015 at 9:09 AM, Roberto Coluccio 
 roberto.coluc...@gmail.com wrote:

 Hello folks,

 I'm experiencing an unexpected behaviour, that suggests me thinking
 about my missing notions on how Spark works. Let's say I have a Spark
 driver that invokes a function like:

 - in myDriver -

 val sparkContext = new SparkContext(mySparkConf)
 val inputPath = file://home/myUser/project/resources/date=*/*

 val myResult = new MyResultFunction()(sparkContext, inputPath)

 - in MyResultFunctionOverRDD --

 class MyResultFunction extends Function2[SparkContext, String,
 RDD[String]] with Serializable {
   override def apply(sparkContext: SparkContext, inputPath: String):
 RDD[String] = {
 try {
   sparkContext.textFile(inputPath, 1)
 } catch {
   case t: Throwable = {
 myLogger.error(serror: ${t.getStackTraceString}\n)
 sc.makeRDD(Seq[String]())
   }
 }
   }
 }

 What happens is that I'm *unable to catch exceptions* thrown by the
 textFile method within the try..catch clause in MyResultFunction. In
 fact, in a unit test for that function where I call it passing an invalid
 inputPath, I don't get an empty RDD as result, but the unit test exits
 (and fails) due to exception not handled.

 What am I missing here?

 Thank you.

 Best regards,
 Roberto






Re: Convert mllib.linalg.Matrix to Breeze

2015-08-21 Thread Burak Yavuz
Hi Naveen,
As I mentioned before, the code is private therefore not accessible. Just
copy and use the snippet that I sent. Copying it here again:
https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270

Best,
Burak

On Thu, Aug 20, 2015 at 9:08 PM, Naveen nav...@formcept.com wrote:

 Hi,

 Thanks for the reply. I tried Matrix.toBreeze() which returns the
 following error:

 *method toBreeze in trait Matrix cannot be accessed in
 org.apache.spark.mllib.linalg.Matrix*



 On Thursday 20 August 2015 07:50 PM, Burak Yavuz wrote:

 Matrix.toBreeze is a private method. MLlib matrices have the same
 structure as Breeze Matrices. Just create a new Breeze matrix like this
 https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270.


 Best,
 Burak


 On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang yblia...@gmail.com wrote:

 You can use Matrix.toBreeze()
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56
  .

 2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com:

 Hi All,

 Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze?
 Any leads are appreciated.


 Thanks,
 Naveen

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Burak Yavuz
Matrix.toBreeze is a private method. MLlib matrices have the same structure
as Breeze Matrices. Just create a new Breeze matrix like this
https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270
.

Best,
Burak


On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang yblia...@gmail.com wrote:

 You can use Matrix.toBreeze()
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56
  .

 2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com:

 Hi All,

 Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze?
 Any leads are appreciated.


 Thanks,
 Naveen

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Creating Spark DataFrame from large pandas DataFrame

2015-08-20 Thread Burak Yavuz
If you would like to try using spark-csv, please use
`pyspark --packages com.databricks:spark-csv_2.11:1.2.0`

You're missing a dependency.

Best,
Burak

On Thu, Aug 20, 2015 at 1:08 PM, Charlie Hack charles.t.h...@gmail.com
wrote:

 Hi,

 I'm new to spark and am trying to create a Spark df from a pandas df with
 ~5 million rows. Using Spark 1.4.1.

 When I type:

 df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None))

 (the df.where is a hack I found on the Spark JIRA to avoid a problem with
 NaN values making mixed column types)

 I get:

 TypeError: cannot create an RDD from type: type 'list'

 Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had
 this issue?


 This is already a workaround-- ideally I'd like to read the spark
 dataframe from a Hive table. But this is currently not an option for my
 setup.

 I also tried reading the data into spark from a CSV using spark-csv.
 Haven't been able to make this work as yet. I launch

 $ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar

 and when I attempt to read the csv I get:

 Py4JJavaError: An error occurred while calling o22.load. :
 java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ...

 Other options I can think of:

 - Convert my CSV to json (use Pig?) and read into Spark
 - Read in using jdbc connect from postgres

 But want to make sure I'm not misusing Spark or missing something obvious.

 Thanks!

 Charlie



Re: Unit Testing

2015-08-13 Thread Burak Yavuz
I would recommend this spark package for your unit testing needs (
http://spark-packages.org/package/holdenk/spark-testing-base).

Best,
Burak

On Thu, Aug 13, 2015 at 5:51 AM, jay vyas jayunit100.apa...@gmail.com
wrote:

 yes there certainly is, so long as eclipse has the right plugins and so on
 to run scala programs.  You're really asking two questions: (1) Can I use a
 modern IDE to develop spark apps and (2) can we easily  unit test spark
 streaming apps.

 the answer is yes to both...

 Regarding your IDE:

 I like to use intellij with the set plugins for scala development.  It
 allows you to run everything from inside the IDE.  I've written up setup
 instructions here:
 http://jayunit100.blogspot.com/2014/07/set-up-spark-application-devleopment.html

 Now, regarding local unit testing:

 As an example, here is a unit test for confirming that spark can write to
 cassandra.


 https://github.com/jayunit100/SparkStreamingApps/blob/master/src/test/scala/TestTwitterCassandraETL.scala

 The key here is to just set your local master in the unit test, like so

 sc.setMaster(local[2])

 local[2] gaurantees that you'll have a producer and a consumer, so that
 you don't get a starvation scenario.


 On Wed, Aug 12, 2015 at 7:31 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Is there a way to run spark streaming methods in standalone eclipse
 environment to test out the functionality?




 --
 jay vyas



Re: Cannot Import Package (spark-csv)

2015-08-03 Thread Burak Yavuz
Hi, there was this issue for Scala 2.11.
https://issues.apache.org/jira/browse/SPARK-7944
It should be fixed on master branch. You may be hitting that.

Best,
Burak

On Sun, Aug 2, 2015 at 9:06 PM, Ted Yu yuzhih...@gmail.com wrote:

 I tried the following command on master branch:
 bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 --jars
 ../spark-csv_2.10-1.0.3.jar --master local

 I didn't reproduce the error with your command.

 FYI

 On Sun, Aug 2, 2015 at 8:57 PM, Bill Chambers 
 wchamb...@ischool.berkeley.edu wrote:

 Sure the commands are:

 scala val df =
 sqlContext.read.format(com.databricks.spark.csv).option(header,
 true).load(cars.csv)

 and get the following error:

 java.lang.RuntimeException: Failed to load class for data source:
 com.databricks.spark.csv
   at scala.sys.package$.error(package.scala:27)
   at
 org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220)
   at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233)
   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
   ... 49 elided

 On Sun, Aug 2, 2015 at 8:56 PM, Ted Yu yuzhih...@gmail.com wrote:

 The command you ran and the error you got were not visible.

 Mind sending them again ?

 Cheers

 On Sun, Aug 2, 2015 at 8:33 PM, billchambers 
 wchamb...@ischool.berkeley.edu wrote:

 I am trying to import the spark csv package while using the scala spark
 shell. Spark 1.4.1, Scala 2.11

 I am starting the shell with:

 bin/spark-shell --packages com.databricks:spark-csv_2.11:1.1.0 --jars
 ../sjars/spark-csv_2.11-1.1.0.jar --master local


 I then try and run



 and get the following error:



 What am i doing wrong?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-Import-Package-spark-csv-tp24109.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Bill Chambers
 http://billchambers.me/
 Email wchamb...@ischool.berkeley.edu | LinkedIn
 http://linkedin.com/in/wachambers | Twitter
 https://twitter.com/b_a_chambers | Github
 https://github.com/anabranch





Re: Cannot Import Package (spark-csv)

2015-08-03 Thread Burak Yavuz
In addition, you do not need to use --jars with --packages. --packages will
get the jar for you.

Best,
Burak

On Mon, Aug 3, 2015 at 9:01 AM, Burak Yavuz brk...@gmail.com wrote:

 Hi, there was this issue for Scala 2.11.
 https://issues.apache.org/jira/browse/SPARK-7944
 It should be fixed on master branch. You may be hitting that.

 Best,
 Burak

 On Sun, Aug 2, 2015 at 9:06 PM, Ted Yu yuzhih...@gmail.com wrote:

 I tried the following command on master branch:
 bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 --jars
 ../spark-csv_2.10-1.0.3.jar --master local

 I didn't reproduce the error with your command.

 FYI

 On Sun, Aug 2, 2015 at 8:57 PM, Bill Chambers 
 wchamb...@ischool.berkeley.edu wrote:

 Sure the commands are:

 scala val df =
 sqlContext.read.format(com.databricks.spark.csv).option(header,
 true).load(cars.csv)

 and get the following error:

 java.lang.RuntimeException: Failed to load class for data source:
 com.databricks.spark.csv
   at scala.sys.package$.error(package.scala:27)
   at
 org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220)
   at
 org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233)
   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
   ... 49 elided

 On Sun, Aug 2, 2015 at 8:56 PM, Ted Yu yuzhih...@gmail.com wrote:

 The command you ran and the error you got were not visible.

 Mind sending them again ?

 Cheers

 On Sun, Aug 2, 2015 at 8:33 PM, billchambers 
 wchamb...@ischool.berkeley.edu wrote:

 I am trying to import the spark csv package while using the scala spark
 shell. Spark 1.4.1, Scala 2.11

 I am starting the shell with:

 bin/spark-shell --packages com.databricks:spark-csv_2.11:1.1.0 --jars
 ../sjars/spark-csv_2.11-1.1.0.jar --master local


 I then try and run



 and get the following error:



 What am i doing wrong?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-Import-Package-spark-csv-tp24109.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Bill Chambers
 http://billchambers.me/
 Email wchamb...@ischool.berkeley.edu | LinkedIn
 http://linkedin.com/in/wachambers | Twitter
 https://twitter.com/b_a_chambers | Github
 https://github.com/anabranch






Re: Which directory contains third party libraries for Spark

2015-07-28 Thread Burak Yavuz
Hey Stephen,

In case these libraries exist on the client as a form of maven library, you
can use --packages to ship the library and all it's dependencies, without
building an uber jar.

Best,
Burak

On Tue, Jul 28, 2015 at 10:23 AM, Marcelo Vanzin van...@cloudera.com
wrote:

 Hi Stephen,

 There is no such directory currently. If you want to add an existing jar
 to every app's classpath, you need to modify two config values:
 spark.driver.extraClassPath and spark.executor.extraClassPath.

 On Mon, Jul 27, 2015 at 10:22 PM, Stephen Boesch java...@gmail.com
 wrote:

 when using spark-submit: which directory contains third party libraries
 that will be loaded on each of the slaves? I would like to scp one or more
 libraries to each of the slaves instead of shipping the contents in the
 application uber-jar.

 Note: I did try adding to $SPARK_HOME/lib_managed/jars.   But the
 spark-submit still results in a ClassNotFoundException for classes included
 in the added library.




 --
 Marcelo



Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Burak Yavuz
Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything.
That's why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores,
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
jonathan.stahl...@capitalone.com wrote:

 Hello again,

 In trying to understand the caching of intermediate RDDs by ALS, I looked
 into the source code and found what may be a bug.  Looking here:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

 you see that ALS.train() is being called with finalRDDStorageLevel =
 StorageLevel.NONE, which I would understand to mean that the intermediate
 RDDs will not be persisted.  Looking here:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

 unpersist() is only being called on the intermediate RDDs (all the *Blocks
 RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.


 This doesn’t make sense to me – I would expect the RDDs to be removed from
 the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
 around.

 Jonathan


 From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com
 Date: Thursday, July 16, 2015 at 2:18 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello all,

 I am running the Spark recommendation algorithm in MLlib and I have been
 studying its output with various model configurations.  Ideally I would
 like to be able to run one job that trains the recommendation model with
 many different configurations to try to optimize for performance.  A sample
 code in python is copied below.

 The issue I have is that each new model which is trained caches a set of
 RDDs and eventually the executors run out of memory.  Is there any way in
 Pyspark to unpersist() these RDDs after each iteration?  The names of the
 RDDs which I gather from the UI is:

 itemInBlocks
 itemOutBlocks
 Products
 ratingBlocks
 userInBlocks
 userOutBlocks
 users

 I am using Spark 1.3.  Thank you for any help!

 Regards,
 Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
 itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
 ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
 lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
 ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ,.join(str(l) for l in
 [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.



Re: LinearRegressionWithSGD Outputs NaN

2015-07-21 Thread Burak Yavuz
Hi,
Could you please decrease your step size to 0.1, and also try 0.01? You
could also try running L-BFGS, which doesn't have step size tuning, to get
better results.

Best,
Burak

On Tue, Jul 21, 2015 at 2:59 AM, Naveen nav...@formcept.com wrote:

 Hi ,

 I am trying to use LinearRegressionWithSGD on Million Song Data Set and my
 model returns NaN's as weights and 0.0 as the intercept. What might be the
 issue for the error ? I am using Spark 1.40 in standalone mode.

 Below is my model:

 val numIterations = 100
  val stepSize = 1.0
  val regParam = 0.01
  val regType = L2
  val algorithm = new LinearRegressionWithSGD()

 algorithm.optimizer.setNumIterations(numIterations).setStepSize(stepSize).setRegParam(regParam)
  val model = algorithm.run(parsedTrainData)

 Regards,
 Naveen

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: RowId unique key for Dataframes

2015-07-21 Thread Burak Yavuz
Would monotonicallyIncreasingId
https://github.com/apache/spark/blob/d4c7a7a3642a74ad40093c96c4bf45a62a470605/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L637
work for you?

Best,
Burak



On Tue, Jul 21, 2015 at 4:55 PM, Srikanth srikanth...@gmail.com wrote:

 Hello,

 I'm creating dataframes from three CSV files using spark-csv package. I
 want to add a unique ID for each row in dataframe.
 Not sure how withColumn() can be used to achieve this. I need a Long value
 not an UUID.

 One option I found was to create a RDD and use zipWithUniqueId.

 sqlContext.textFile(file).
 zipWithUniqueId().
 map(case(d, i)=i.toString + delimiter + d).
 map(_.split(delimiter)).
 map(s=caseclass(...))

 .toDF().select(field1, field2)


 Its a bit hacky. Is there an easier way to do this on dataframes and use
 spark-csv?

 Srikanth



Re: Strange behavoir of pyspark with --jars option

2015-07-15 Thread Burak Yavuz
Hi,
I believe the HiveContext uses a different class loader. It then falls back
to the system class loader if it can't find the classes in the context
class loader. The system class loader contains the classpath passed
through --driver-class-path
and spark.executor.extraClassPath. The JVM is already running during the
resolution of jars in --jars, therefore, they can't be added to the System
Classloader. Instead they live in a separate context classloader, which the
HiveContext doesn't use, hence the lost dependencies.

I know what I wrote may be a little complicated, please let me know if you
have any problems. I HTH.

Best,
Burak

On Tue, Jul 14, 2015 at 11:15 PM, gen tang gen.tan...@gmail.com wrote:

 Hi,

 I met some interesting problems with --jars options
 As I use the third party dependencies: elasticsearch-spark, I pass this
 jar with the following command:
 ./bin/spark-submit --jars path-to-dependencies ...
 It works well.
 However, if I use HiveContext.sql, spark will lost the dependencies that I
 passed.It seems that the execution of HiveContext will override the
 configuration.(But if we check sparkContext._conf, the configuration is
 unchanged)

 But if I passed dependencies with --driver-class-path
 and spark.executor.extraClassPath. The problem will disappear.

 Is there anyone know why this interesting problem happens?

 Thanks a lot for your help in advance.

 Cheers
 Gen



Re: MLlib LogisticRegressionWithLBFGS error

2015-07-15 Thread Burak Yavuz
Hi,

Is this in LibSVM format? If so, the indices should be sorted in increasing
order. It seems like they are not sorted.

Best,
Burak

On Tue, Jul 14, 2015 at 7:31 PM, Vi Ngo Van ngovi.se@gmail.com wrote:

 Hi All,
 I've met a issue with MLlib when i use LogisticRegressionWithLBFGS

 my sample data :

 *0 863:1 40646:1 37697:1 1423:1 38648:1 4230:1 23823:1 41594:1 27614:1
 5689:1 18493:1 44187:1 5694:1 27799:1 12010:1*
 *0 863:1 40646:1 37697:1 1423:1 38648:1 4230:1 23823:1 41594:1 27614:1
 5689:1 18493:1 44187:1 5694:1 27799:1 12010:1*
 *0 29510:1 42091:1 8258:1 46813:1 37658:1 37868:1 29478:1 15872:1 13188:1
 2193:1 27614:1 50162:1 5668:1 5617:1 48841:1 36836:1 2325:1 7382:1 24668:1*
 *2 43736:1 35551:1 535:1 19816:1 28472:1 45485:1 33417:1 42875:1 235:1
 21952:1 23361:1 37697:1 42615:1 27425:1 32021:1 1423:1 38648:1 18241:1
 32213:1 11935:1 34422:1 40512:1 25988:1 4798:1 44180:1 37697:1 38968:1
 25988:1 32024:1 18455:1 14193:1 8538:1*
 *2 42015:1 33077:1 38396:1 21952:1 23361:1 50762:1 25988:1 4798:1 44180:1
 38968:1 1423:1 38648:1 41415:1 535:1 19816:1 28472:1 45485:1 33417:1
 42875:1 235:1 26256:1 9060:1*
 *0 4798:1 44180:1 4788:1 42630:1*
 ...

 I have given a error :
 *java.lang.ArrayIndexOutOfBoundsException: 52686*
 * at
 org.apache.spark.mllib.stat.MultivariateOnlineSummarizer$$anonfun$add$3.apply$mcVID$sp(MultivariateOnlineSummarizer.scala:82)*

 Hope, someone can help.
 Thanks you.

 --
 Ngo Van Vi http://www.facebook.com/ngovanvi
 Mobile : (+84)1695893851
 Skype: ngovi_htbk



Re: creating a distributed index

2015-07-15 Thread Burak Yavuz
Hi Swetha,

IndexedRDD is available as a package on Spark Packages
http://spark-packages.org/package/amplab/spark-indexedrdd.

Best,
Burak

On Tue, Jul 14, 2015 at 5:23 PM, swetha swethakasire...@gmail.com wrote:

 Hi Ankur,

 Is IndexedRDD available in Spark 1.4.0? We would like to use this in Spark
 Streaming to do lookups/updates/deletes in RDDs using keys by storing them
 as key/value pairs.

 Thanks,
 Swetha





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-distributed-index-tp11204p23842.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Running mllib from R in Spark 1.4

2015-07-15 Thread Burak Yavuz
Hi,
There is no MLlib support in SparkR in 1.4. There will be some support in
1.5. You can check these JIRAs for progress:
https://issues.apache.org/jira/browse/SPARK-6805
https://issues.apache.org/jira/browse/SPARK-6823

Best,
Burak

On Wed, Jul 15, 2015 at 6:00 AM, madhu phatak phatak@gmail.com wrote:

 Hi,
 I have been playing with Spark R API that is introduced in Spark 1.4
 version. Can we use any mllib functionality from the R as of now?. From the
 documentation it looks like we can only use SQL/Dataframe functionality as
 of now. I know there is separate project SparkR project but it doesnot
 seems to be maintained in future.

 So if I want to run machine learning on SparkR, what are the options as of
 now?

 --
 Regards,
 Madhukara Phatak
 http://datamantra.io/



Re: To access elements of a org.apache.spark.mllib.linalg.Vector

2015-07-14 Thread Burak Yavuz
Hi Dan,

You could zip the indices with the values if you like.

```
val sVec = sparseVector(1).asInstanceOf[
org.apache.spark.mllib.linalg.SparseVector]
val map = sVec.indices.zip(sVec.values).toMap
```

Best,
Burak

On Tue, Jul 14, 2015 at 12:23 PM, Dan Dong dongda...@gmail.com wrote:

 Hi,
   I'm wondering how to access elements of a linalg.Vector, e.g:
 sparseVector: Seq[org.apache.spark.mllib.linalg.Vector] =
 List((3,[1,2],[1.0,2.0]), (3,[0,1,2],[3.0,4.0,5.0]))


 scala sparseVector(1)
 res16: org.apache.spark.mllib.linalg.Vector = (3,[0,1,2],[3.0,4.0,5.0])

 How to get the indices(0,1,2) and values(3.0,4.0,5.0) of  e.g:
  (3,[0,1,2],[3.0,4.0,5.0]) ?
 It will be useful to map them into index-value pairs like:
 0-3.0
 1-4.0
 2-5.0

 I could not find such info from:

 https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.mllib.linalg.Vector

 Thanks.

 Cheers,
 Dan




Re: [MLLib][Kmeans] KMeansModel.computeCost takes lot of time

2015-07-13 Thread Burak Yavuz
What are the other parameters? Are you just setting k=3? What about # of
runs? How many partitions do you have? How many cores does your machine
have?

Thanks,
Burak

On Mon, Jul 13, 2015 at 10:57 AM, Nirmal Fernando nir...@wso2.com wrote:

 Hi Burak,

 k = 3
 dimension = 785 features
 Spark 1.4

 On Mon, Jul 13, 2015 at 10:28 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 How are you running K-Means? What is your k? What is the dimension of
 your dataset (columns)? Which Spark version are you using?

 Thanks,
 Burak

 On Mon, Jul 13, 2015 at 2:53 AM, Nirmal Fernando nir...@wso2.com wrote:

 Hi,

 For a fairly large dataset, 30MB, KMeansModel.computeCost takes lot of
 time (16+ mints).

 It takes lot of time at this task;

 org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:33)
 org.apache.spark.mllib.clustering.KMeansModel.computeCost(KMeansModel.scala:70)

 Can this be improved?

 --

 Thanks  regards,
 Nirmal

 Associate Technical Lead - Data Technologies Team, WSO2 Inc.
 Mobile: +94715779733
 Blog: http://nirmalfdo.blogspot.com/






 --

 Thanks  regards,
 Nirmal

 Associate Technical Lead - Data Technologies Team, WSO2 Inc.
 Mobile: +94715779733
 Blog: http://nirmalfdo.blogspot.com/





Re: [MLLib][Kmeans] KMeansModel.computeCost takes lot of time

2015-07-13 Thread Burak Yavuz
Can you call repartition(8) or 16 on data.rdd(), before KMeans, and also,
.cache()?

something like, (I'm assuming you are using Java):
```
JavaRDDVector input = data.repartition(8).cache();
org.apache.spark.mllib.clustering.KMeans.train(input.rdd(), 3, 20);
```

On Mon, Jul 13, 2015 at 11:10 AM, Nirmal Fernando nir...@wso2.com wrote:

 I'm using;

 org.apache.spark.mllib.clustering.KMeans.train(data.rdd(), 3, 20);

 Cpu cores: 8 (using default Spark conf thought)

 On partitions, I'm not sure how to find that.

 On Mon, Jul 13, 2015 at 11:30 PM, Burak Yavuz brk...@gmail.com wrote:

 What are the other parameters? Are you just setting k=3? What about # of
 runs? How many partitions do you have? How many cores does your machine
 have?

 Thanks,
 Burak

 On Mon, Jul 13, 2015 at 10:57 AM, Nirmal Fernando nir...@wso2.com
 wrote:

 Hi Burak,

 k = 3
 dimension = 785 features
 Spark 1.4

 On Mon, Jul 13, 2015 at 10:28 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 How are you running K-Means? What is your k? What is the dimension of
 your dataset (columns)? Which Spark version are you using?

 Thanks,
 Burak

 On Mon, Jul 13, 2015 at 2:53 AM, Nirmal Fernando nir...@wso2.com
 wrote:

 Hi,

 For a fairly large dataset, 30MB, KMeansModel.computeCost takes lot of
 time (16+ mints).

 It takes lot of time at this task;

 org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:33)
 org.apache.spark.mllib.clustering.KMeansModel.computeCost(KMeansModel.scala:70)

 Can this be improved?

 --

 Thanks  regards,
 Nirmal

 Associate Technical Lead - Data Technologies Team, WSO2 Inc.
 Mobile: +94715779733
 Blog: http://nirmalfdo.blogspot.com/






 --

 Thanks  regards,
 Nirmal

 Associate Technical Lead - Data Technologies Team, WSO2 Inc.
 Mobile: +94715779733
 Blog: http://nirmalfdo.blogspot.com/






 --

 Thanks  regards,
 Nirmal

 Associate Technical Lead - Data Technologies Team, WSO2 Inc.
 Mobile: +94715779733
 Blog: http://nirmalfdo.blogspot.com/





Re: [MLLib][Kmeans] KMeansModel.computeCost takes lot of time

2015-07-13 Thread Burak Yavuz
Hi,

How are you running K-Means? What is your k? What is the dimension of your
dataset (columns)? Which Spark version are you using?

Thanks,
Burak

On Mon, Jul 13, 2015 at 2:53 AM, Nirmal Fernando nir...@wso2.com wrote:

 Hi,

 For a fairly large dataset, 30MB, KMeansModel.computeCost takes lot of
 time (16+ mints).

 It takes lot of time at this task;

 org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:33)
 org.apache.spark.mllib.clustering.KMeansModel.computeCost(KMeansModel.scala:70)

 Can this be improved?

 --

 Thanks  regards,
 Nirmal

 Associate Technical Lead - Data Technologies Team, WSO2 Inc.
 Mobile: +94715779733
 Blog: http://nirmalfdo.blogspot.com/





Re: Unit tests of spark application

2015-07-10 Thread Burak Yavuz
I can +1 Holden's spark-testing-base package.

Burak

On Fri, Jul 10, 2015 at 12:23 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Somewhat biased of course, but you can also use spark-testing-base from
 spark-packages.org as a basis for your unittests.

 On Fri, Jul 10, 2015 at 12:03 PM, Daniel Siegmann 
 daniel.siegm...@teamaol.com wrote:

 On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire vmadh...@umail.iu.edu
 wrote:

 I want to write junit test cases in scala for testing spark application.
 Is there any guide or link which I can refer.


 https://spark.apache.org/docs/latest/programming-guide.html#unit-testing

 Typically I create test data using SparkContext.parallelize and then
 call RDD.collect to get the results to assert.




 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



Re: How to ignore features in mllib

2015-07-09 Thread Burak Yavuz
If you use the Pipelines Api with DataFrames, you select which columns you
would like to train on using the VectorAssembler. While using the
VectorAssembler, you can choose not to select some features if you like.

Best,
Burak

On Thu, Jul 9, 2015 at 10:38 AM, Arun Luthra arun.lut...@gmail.com wrote:

 Is it possible to ignore features in mllib? In other words, I would like
 to have some 'pass-through' data, Strings for example, attached to training
 examples and test data.

 A related stackoverflow question:
 http://stackoverflow.com/questions/30739283/spark-mllib-how-to-ignore-features-when-training-a-classifier

 Arun



Re: spark-submit can not resolve spark-hive_2.10

2015-07-07 Thread Burak Yavuz
spark-hive is excluded when using --packages, because it can be included in
the spark-assembly by adding -Phive during mvn package or sbt assembly.

Best,
Burak

On Tue, Jul 7, 2015 at 8:06 AM, Hao Ren inv...@gmail.com wrote:

 I want to add spark-hive as a dependence to submit my job, but it seems
 that
 spark-submit can not resolve it.

 $ ./bin/spark-submit \
 → --packages

 org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1
 \
 → --class fr.leboncoin.etl.jobs.dwh.AdStateTraceDWHTransform \
 → --master spark://localhost:7077 \

 Ivy Default Cache set to: /home/invkrh/.ivy2/cache
 The jars for the packages stored in: /home/invkrh/.ivy2/jars
 https://repository.jboss.org/nexus/content/repositories/releases/ added
 as a
 remote repository with the name: repo-1
 :: loading settings :: url =

 jar:file:/home/invkrh/workspace/scala/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
 org.apache.spark#spark-hive_2.10 added as a dependency
 org.postgresql#postgresql added as a dependency
 joda-time#joda-time added as a dependency
 :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
 confs: [default]
 found org.postgresql#postgresql;9.3-1103-jdbc3 in local-m2-cache
 found joda-time#joda-time;2.8.1 in central
 :: resolution report :: resolve 139ms :: artifacts dl 3ms
 :: modules in use:
 joda-time#joda-time;2.8.1 from central in [default]
 org.postgresql#postgresql;9.3-1103-jdbc3 from local-m2-cache in
 [default]

 -
 |  |modules||   artifacts
  |
 |   conf   | number| search|dwnlded|evicted||
 number|dwnlded|

 -
 |  default |   2   |   0   |   0   |   0   ||   2   |   0
  |

 -
 :: retrieving :: org.apache.spark#spark-submit-parent
 confs: [default]
 0 artifacts copied, 2 already retrieved (0kB/6ms)
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/sql/hive/HiveContext
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at

 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.sql.hive.HiveContext
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 7 more
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/07/07 16:57:59 INFO Utils: Shutdown hook called

 Any help is appreciated. Thank you.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-can-not-resolve-spark-hive-2-10-tp23695.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark 1.4 MLLib Bug?: Multiclass Classification requirement failed: sizeInBytes was negative

2015-07-03 Thread Burak Yavuz
How many partitions do you have? It might be that one partition is too
large, and there is Integer overflow. Could you double your number of
partitions?

Burak

On Fri, Jul 3, 2015 at 4:41 AM, Danny kont...@dannylinden.de wrote:

 hi,

 i want to run a multiclass classification with 390 classes on120k label
 points(tf-idf vectors). but i get the following exception. If i reduce the
 number of classes to ~20 everythings work fine. How can i fix this?

  i use the LogisticRegressionWithLBFGS class for my classification on a 8
 Node Cluster with


 total-executor-cores = 30

 executor-memory = 20g

 My Exception:

 15/07/02 15:55:00 INFO DAGScheduler: Job 11 finished: count at
 LBFGS.scala:170, took 0,521823 s
 15/07/02 15:55:02 INFO MemoryStore: ensureFreeSpace(-1069858488) called
 with
 curMem=308280107, maxMem=3699737
 15/07/02 15:55:02 INFO MemoryStore: Block broadcast_22 stored as values in
 memory (estimated size -1069858488.0 B, free 11.1 GB)
 Exception in thread main java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
 at
 org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
 Caused by: java.lang.IllegalArgumentException: requirement failed:
 sizeInBytes was negative: -1069858488
 at scala.Predef$.require(Predef.scala:233)
 at org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55)
 at
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812)
 at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:635)
 at
 org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:993)
 at

 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
 at

 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
 at

 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 at

 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
 at

 org.apache.spark.mllib.optimization.LBFGS$CostFun.calculate(LBFGS.scala:215)
 at

 org.apache.spark.mllib.optimization.LBFGS$CostFun.calculate(LBFGS.scala:204)
 at
 breeze.optimize.CachedDiffFunction.calculate(CachedDiffFunction.scala:23)
 at

 breeze.optimize.FirstOrderMinimizer.calculateObjective(FirstOrderMinimizer.scala:108)
 at

 breeze.optimize.FirstOrderMinimizer.initialState(FirstOrderMinimizer.scala:101)
 at

 breeze.optimize.FirstOrderMinimizer.iterations(FirstOrderMinimizer.scala:146)
 at
 org.apache.spark.mllib.optimization.LBFGS$.runLBFGS(LBFGS.scala:178)
 at
 org.apache.spark.mllib.optimization.LBFGS.optimize(LBFGS.scala:117)
 at

 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:282)
 at

 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:205)
 at

 com.test.spark.SVMSimpleAppEC2$.createNaiveBayesModel(SVMSimpleAppEC2.scala:150)
 at com.test.spark.SVMSimpleAppEC2$.main(SVMSimpleAppEC2.scala:48)
 at com.test.spark.SVMSimpleAppEC2.main(SVMSimpleAppEC2.scala)
 ... 6 more
 15/07/02 15:55:02 INFO SparkContext: Invoking stop() from shutdown hook



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-MLLib-Bug-Multiclass-Classification-requirement-failed-sizeInBytes-was-negative-tp23610.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: coalesce on dataFrame

2015-07-01 Thread Burak Yavuz
You can use df.repartition(1) in Spark 1.4. See here
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L1396
.

Best,
Burak

On Wed, Jul 1, 2015 at 3:05 AM, Olivier Girardot ssab...@gmail.com wrote:

 PySpark or Spark (scala) ?
 When you use coalesce with anything but a column you must use a literal
 like that in PySpark :

 from pyspark.sql import functions as F

 F.coalesce(df.a, F.lit(True))

 Le mer. 1 juil. 2015 à 12:03, Ewan Leith ewan.le...@realitymine.com a
 écrit :

 It's in spark 1.4.0, or should be at least:

 https://issues.apache.org/jira/browse/SPARK-6972

 Ewan

 -Original Message-
 From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com]
 Sent: 01 July 2015 08:23
 To: user@spark.apache.org
 Subject: coalesce on dataFrame

 How can we use coalesce(1, true) on dataFrame?


 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-on-dataFrame-tp23564.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Can Dependencies Be Resolved on Spark Cluster?

2015-06-30 Thread Burak Yavuz
Hi,
In your build.sbt file, all the dependencies you have (hopefully they're
not too many, they only have a lot of transitive dependencies), for example:
```
libraryDependencies += org.apache.hbase % hbase % 1.1.1

libraryDependencies += junit % junit % x

resolvers += Some other repo at http://some.other.repo;

resolvers += Some other repo2 at http://some.other.repo2;
```

call `sbt package`, and then run spark-submit as:

$ bin/spark-submit --packages org.apache.hbase:hbase:1.1.1, junit:junit:x
--repositories http://some.other.repo,http://some.other.repo2 $YOUR_JAR

Best,
Burak





On Mon, Jun 29, 2015 at 11:33 PM, SLiZn Liu sliznmail...@gmail.com wrote:

 Hi Burak,

 Is `--package` flag only available for maven, no sbt support?

 On Tue, Jun 30, 2015 at 2:26 PM Burak Yavuz brk...@gmail.com wrote:

 You can pass `--packages your:comma-separated:maven-dependencies` to
 spark submit if you have Spark 1.3 or greater.

 Best regards,
 Burak

 On Mon, Jun 29, 2015 at 10:46 PM, SLiZn Liu sliznmail...@gmail.com
 wrote:

 Hey Spark Users,

 I'm writing a demo with Spark and HBase. What I've done is packaging a
 **fat jar**: place dependencies in `build.sbt`, and use `sbt assembly` to
 package **all dependencies** into one big jar. The rest work is copy the
 fat jar to Spark master node and then launch by `spark-submit`.

 The defect of the fat jar fashion is obvious: all dependencies is
 packed, yielding a huge jar file. Even worse, in my case, a vast amount of
 the conflicting package files  in `~/.ivy/cache`fails when merging, I had
 to manually specify `MergingStrategy` as `rename` for all conflicting files
 to bypass this issue.

 Then I thought, there should exists an easier way to submit a thin jar
 with build.sbt-like file specifying dependencies, and then dependencies are
 automatically resolved across the cluster before the actual job is
 launched. I googled, except nothing related was found. Is this plausible,
 or is there other better ways to achieve the same goal?

 BEST REGARDS,
 Todd Leo





Re: Can Dependencies Be Resolved on Spark Cluster?

2015-06-30 Thread Burak Yavuz
You can pass `--packages your:comma-separated:maven-dependencies` to spark
submit if you have Spark 1.3 or greater.

Best regards,
Burak

On Mon, Jun 29, 2015 at 10:46 PM, SLiZn Liu sliznmail...@gmail.com wrote:

 Hey Spark Users,

 I'm writing a demo with Spark and HBase. What I've done is packaging a
 **fat jar**: place dependencies in `build.sbt`, and use `sbt assembly` to
 package **all dependencies** into one big jar. The rest work is copy the
 fat jar to Spark master node and then launch by `spark-submit`.

 The defect of the fat jar fashion is obvious: all dependencies is
 packed, yielding a huge jar file. Even worse, in my case, a vast amount of
 the conflicting package files  in `~/.ivy/cache`fails when merging, I had
 to manually specify `MergingStrategy` as `rename` for all conflicting files
 to bypass this issue.

 Then I thought, there should exists an easier way to submit a thin jar
 with build.sbt-like file specifying dependencies, and then dependencies are
 automatically resolved across the cluster before the actual job is
 launched. I googled, except nothing related was found. Is this plausible,
 or is there other better ways to achieve the same goal?

 BEST REGARDS,
 Todd Leo



Re: Understanding accumulator during transformations

2015-06-24 Thread Burak Yavuz
Hi Wei,

For example, when a straggler executor gets killed in the middle of a map
operation and it's task is restarted at a different instance, the
accumulator will be updated more than once.

Best,
Burak

On Wed, Jun 24, 2015 at 1:08 PM, Wei Zhou zhweisop...@gmail.com wrote:

 Quoting from Spark Program guide:

 For accumulator updates performed inside *actions only*, Spark
 guarantees that each task’s update to the accumulator will only be applied
 once, i.e. restarted tasks will not update the value. In transformations,
 users should be aware of that each task’s update may be applied more than
 once if tasks or job stages are re-executed.

 Can anyone gives me a possible scenario of when accumulator might be
 updated more than once during transformation? Thanks.

 Regards,
 Wei



Re: Understanding accumulator during transformations

2015-06-24 Thread Burak Yavuz
Hi Wei,

During the action, all the transformations before it will occur in order
leading up to the action. If you have an accumulator in any of these
transformations, then you won't get exactly once semantics, because the
transformation may be restarted elsewhere.

Bet,
Burak

On Wed, Jun 24, 2015 at 2:25 PM, Wei Zhou zhweisop...@gmail.com wrote:

 Hi Burak,

 Thanks for your quick reply. I guess what confuses me is that accumulator
 won't be updated until an action is used due to the laziness, so
 transformation such as a map won't even update the accumulator, then how
 would restarted the transformation ended up updating accumulator more than
 once?

 Best,
 Wei

 2015-06-24 13:23 GMT-07:00 Burak Yavuz brk...@gmail.com:

 Hi Wei,

 For example, when a straggler executor gets killed in the middle of a map
 operation and it's task is restarted at a different instance, the
 accumulator will be updated more than once.

 Best,
 Burak

 On Wed, Jun 24, 2015 at 1:08 PM, Wei Zhou zhweisop...@gmail.com wrote:

 Quoting from Spark Program guide:

 For accumulator updates performed inside *actions only*, Spark
 guarantees that each task’s update to the accumulator will only be applied
 once, i.e. restarted tasks will not update the value. In transformations,
 users should be aware of that each task’s update may be applied more than
 once if tasks or job stages are re-executed.

 Can anyone gives me a possible scenario of when accumulator might be
 updated more than once during transformation? Thanks.

 Regards,
 Wei






Re: Confusion matrix for binary classification

2015-06-22 Thread Burak Yavuz
Hi,

In Spark 1.4, you may use DataFrame.stat.crosstab to generate the confusion
matrix. This would be very simple if you are using the ML Pipelines Api,
and are working with DataFrames.

Best,
Burak

On Mon, Jun 22, 2015 at 4:21 AM, CD Athuraliya cdathural...@gmail.com
wrote:

 Hi,

 I am looking for a way to get confusion matrix for binary classification.
 I was able to get confusion matrix for multiclass classification using this
 [1]. But I could not find a proper way to get confusion matrix in similar
 class available for binary classification [2]. Later I found this class [3]
 which corresponds to my requirement but I am not sure about the way I
 should use that class to get evaluation metrics for binary classification.
 e.g. Given the constructor BinaryConfusionMatrixImpl(BinaryLabelCounter
 count, BinaryLabelCounter totalCount), from where I can get this count and
 totalCount? Appreciate any help on this.

 [1]
 http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/MulticlassMetrics.html#confusionMatrix()
 [2]
 http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html
 [3]
 http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrixImpl.html

 Thanks

 --
 *CD Athuraliya*
 Software Engineer
 WSO2, Inc.
 Mobile: +94 716288847 94716288847
 LinkedIn http://lk.linkedin.com/in/cdathuraliya | Twitter
 https://twitter.com/cdathuraliya | Blog
 http://cdathuraliya.tumblr.com/



Re: SparkSubmit with Ivy jars is very slow to load with no internet access

2015-06-18 Thread Burak Yavuz
Hey Nathan,

I like the first idea better. Let's see what others think. I'd be happy to
review your PR afterwards!

Best,
Burak

On Thu, Jun 18, 2015 at 9:53 PM, Nathan McCarthy 
nathan.mccar...@quantium.com.au wrote:

  Hey,

  Spark Submit adds maven central  spark bintray to the ChainResolver
 before it adds any external resolvers.
 https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L821


  When running on a cluster without internet access, this means the spark
 shell takes forever to launch as it tries these two remote repos before the
 ones specified in the --repositories list. In our case we have a proxy
 which the cluster can access it and supply it via —repositories. This is
 also a problem for users who maintain a proxy for maven/ivy repos with
 something like Nexus/Artifactory.

  I see two options for a fix;

- Change the order repos are added to the ChainResolver, making the
--repositories supplied repos come before anything else.

 https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L843

- Have a config option (like spark.jars.ivy.useDefaultRemoteRepos,
default true) which when false wont add the maven central  bintry to the
ChainResolver.

 Happy to do a PR now for this if someone can give me a recommendation on
 which option would be better.

  JIRA here; https://issues.apache.org/jira/browse/SPARK-8475

  Cheers,
 Nathan




Re: --packages Failed to load class for data source v1.4

2015-06-14 Thread Burak Yavuz
Hi Don,
This seems related to a known issue, where the classpath on the driver is
missing the related classes. This is a bug in py4j as py4j uses the System
Classloader rather than Spark's Context Classloader. However, this problem
existed in 1.3.0 as well, therefore I'm curious whether it's the same
issue. Thanks for opening the Jira, I'll take a look.

Best,
Burak
On Jun 14, 2015 2:40 PM, Don Drake dondr...@gmail.com wrote:


 I looked at this again, and when I use the Scala spark-shell and load a
 CSV using the same package it works just fine, so this seems specific to
 pyspark.

 I've created the following JIRA:
 https://issues.apache.org/jira/browse/SPARK-8365

 -Don

 On Sat, Jun 13, 2015 at 11:46 AM, Don Drake dondr...@gmail.com wrote:

 I downloaded the pre-compiled Spark 1.4.0 and attempted to run an
 existing Python Spark application against it and got the following error:

 py4j.protocol.Py4JJavaError: An error occurred while calling o90.save.
 : java.lang.RuntimeException: Failed to load class for data source:
 com.databricks.spark.csv

 I pass the following on the command-line to my spark-submit:
 --packages com.databricks:spark-csv_2.10:1.0.3

 This worked fine on 1.3.1, but not in 1.4.

 I was able to replicate it with the following pyspark:

 a = {'a':1.0, 'b':'asdf'}
 rdd = sc.parallelize([a])
 df = sqlContext.createDataFrame(rdd)
 df.save(/tmp/d.csv, com.databricks.spark.csv)


 Even using the new
 df.write.format('com.databricks.spark.csv').save('/tmp/d.csv') gives the
 same error.

 I see it was added in the web UI:
 file:/Users/drake/.ivy2/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded
 By User
 file:/Users/drake/.ivy2/jars/org.apache.commons_commons-csv-1.1.jarAdded
 By User
 http://10.0.0.222:56871/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded
 By User
 http://10.0.0.222:56871/jars/org.apache.commons_commons-csv-1.1.jarAdded
 By User
 Thoughts?

 -Don



 Gory details:

 $ pyspark --packages com.databricks:spark-csv_2.10:1.0.3
 Python 2.7.6 (default, Sep  9 2014, 15:04:36)
 [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
 Type help, copyright, credits or license for more information.
 Ivy Default Cache set to: /Users/drake/.ivy2/cache
 The jars for the packages stored in: /Users/drake/.ivy2/jars
 :: loading settings :: url =
 jar:file:/Users/drake/spark/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
 com.databricks#spark-csv_2.10 added as a dependency
 :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
 confs: [default]
 found com.databricks#spark-csv_2.10;1.0.3 in central
 found org.apache.commons#commons-csv;1.1 in central
 :: resolution report :: resolve 590ms :: artifacts dl 17ms
 :: modules in use:
 com.databricks#spark-csv_2.10;1.0.3 from central in [default]
 org.apache.commons#commons-csv;1.1 from central in [default]
 -
 |  |modules||   artifacts   |
 |   conf   | number| search|dwnlded|evicted|| number|dwnlded|
 -
 |  default |   2   |   0   |   0   |   0   ||   2   |   0   |
 -
 :: retrieving :: org.apache.spark#spark-submit-parent
 confs: [default]
 0 artifacts copied, 2 already retrieved (0kB/15ms)
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/06/13 11:06:08 INFO SparkContext: Running Spark version 1.4.0
 2015-06-13 11:06:08.921 java[19233:2145789] Unable to load realm info
 from SCDynamicStore
 15/06/13 11:06:09 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/06/13 11:06:09 WARN Utils: Your hostname, Dons-MacBook-Pro-2.local
 resolves to a loopback address: 127.0.0.1; using 10.0.0.222 instead (on
 interface en0)
 15/06/13 11:06:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
 another address
 15/06/13 11:06:09 INFO SecurityManager: Changing view acls to: drake
 15/06/13 11:06:09 INFO SecurityManager: Changing modify acls to: drake
 15/06/13 11:06:09 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(drake); users
 with modify permissions: Set(drake)
 15/06/13 11:06:10 INFO Slf4jLogger: Slf4jLogger started
 15/06/13 11:06:10 INFO Remoting: Starting remoting
 15/06/13 11:06:10 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@10.0.0.222:56870]
 15/06/13 11:06:10 INFO Utils: Successfully started service 'sparkDriver'
 on port 56870.
 15/06/13 11:06:10 INFO SparkEnv: Registering MapOutputTracker
 15/06/13 11:06:10 INFO SparkEnv: Registering BlockManagerMaster
 15/06/13 11:06:10 INFO DiskBlockManager: Created local directory at
 

Re: How to read avro in SparkR

2015-06-13 Thread Burak Yavuz
Hi,
Not sure if this is it, but could you please try
com.databricks.spark.avro instead of just avro.

Thanks,
Burak
On Jun 13, 2015 9:55 AM, Shing Hing Man mat...@yahoo.com.invalid wrote:

 Hi,
   I am trying to read a avro file in SparkR (in Spark 1.4.0).

 I started R using the following.
 matmsh@gauss:~$ sparkR --packages com.databricks:spark-avro_2.10:1.0.0

 Inside the R shell, when I issue the following,

  read.df(sqlContext, file:///home/matmsh/myfile.avro,avro)

 I get the following exception.
 Caused by: java.lang.RuntimeException: Failed to load class for data
 source: avro

 Below is the stack trace.


 matmsh@gauss:~$ sparkR --packages com.databricks:spark-avro_2.10:1.0.0

 R version 3.2.0 (2015-04-16) -- Full of Ingredients
 Copyright (C) 2015 The R Foundation for Statistical Computing
 Platform: x86_64-suse-linux-gnu (64-bit)

 R is free software and comes with ABSOLUTELY NO WARRANTY.
 You are welcome to redistribute it under certain conditions.
 Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

 R is a collaborative project with many contributors.
 Type 'contributors()' for more information and
 'citation()' on how to cite R or R packages in publications.

 Type 'demo()' for some demos, 'help()' for on-line help, or
 'help.start()' for an HTML browser interface to help.
 Type 'q()' to quit R.

 Launching java with spark-submit command
 /home/matmsh/installed/spark/bin/spark-submit --packages
 com.databricks:spark-avro_2.10:1.0.0 sparkr-shell
 /tmp/RtmpoT7FrF/backend_port464e1e2fb16a
 Ivy Default Cache set to: /home/matmsh/.ivy2/cache
 The jars for the packages stored in: /home/matmsh/.ivy2/jars
 :: loading settings :: url =
 jar:file:/home/matmsh/installed/sparks/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
 com.databricks#spark-avro_2.10 added as a dependency
 :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
  confs: [default]
  found com.databricks#spark-avro_2.10;1.0.0 in list
  found org.apache.avro#avro;1.7.6 in local-m2-cache
  found org.codehaus.jackson#jackson-core-asl;1.9.13 in list
  found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in list
  found com.thoughtworks.paranamer#paranamer;2.3 in list
  found org.xerial.snappy#snappy-java;1.0.5 in list
  found org.apache.commons#commons-compress;1.4.1 in list
  found org.tukaani#xz;1.0 in list
  found org.slf4j#slf4j-api;1.6.4 in list
 :: resolution report :: resolve 421ms :: artifacts dl 16ms
  :: modules in use:
  com.databricks#spark-avro_2.10;1.0.0 from list in [default]
  com.thoughtworks.paranamer#paranamer;2.3 from list in [default]
  org.apache.avro#avro;1.7.6 from local-m2-cache in [default]
  org.apache.commons#commons-compress;1.4.1 from list in [default]
  org.codehaus.jackson#jackson-core-asl;1.9.13 from list in [default]
  org.codehaus.jackson#jackson-mapper-asl;1.9.13 from list in [default]
  org.slf4j#slf4j-api;1.6.4 from list in [default]
  org.tukaani#xz;1.0 from list in [default]
  org.xerial.snappy#snappy-java;1.0.5 from list in [default]
  -
  | | modules || artifacts |
  | conf | number| search|dwnlded|evicted|| number|dwnlded|
  -
  | default | 9 | 0 | 0 | 0 || 9 | 0 |
  -
 :: retrieving :: org.apache.spark#spark-submit-parent
  confs: [default]
  0 artifacts copied, 9 already retrieved (0kB/9ms)
 15/06/13 17:37:42 INFO spark.SparkContext: Running Spark version 1.4.0
 15/06/13 17:37:42 WARN util.NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/06/13 17:37:42 WARN util.Utils: Your hostname, gauss resolves to a
 loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface
 enp3s0)
 15/06/13 17:37:42 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind
 to another address
 15/06/13 17:37:42 INFO spark.SecurityManager: Changing view acls to: matmsh
 15/06/13 17:37:42 INFO spark.SecurityManager: Changing modify acls to:
 matmsh
 15/06/13 17:37:42 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(matmsh); users with modify permissions: Set(matmsh)
 15/06/13 17:37:43 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/06/13 17:37:43 INFO Remoting: Starting remoting
 15/06/13 17:37:43 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@192.168.0.10:46219]
 15/06/13 17:37:43 INFO util.Utils: Successfully started service
 'sparkDriver' on port 46219.
 15/06/13 17:37:43 INFO spark.SparkEnv: Registering MapOutputTracker
 15/06/13 17:37:43 INFO spark.SparkEnv: Registering BlockManagerMaster
 15/06/13 17:37:43 INFO storage.DiskBlockManager: Created local directory
 at
 

Re: foreach plus accumulator Vs mapPartitions performance

2015-05-21 Thread Burak Yavuz
Or you can simply use `reduceByKeyLocally` if you don't want to worry about
implementing accumulators and such, and assuming that the reduced values
will fit in memory of the driver (which you are assuming by using
accumulators).

Best,
Burak

On Thu, May 21, 2015 at 2:46 PM, ben delpizz...@gmail.com wrote:

 Hi, everybody.

 There are some cases in which I can obtain the same results by using the
 mapPartitions and the foreach method.

 For example in a typical MapReduce approach one would perform a reduceByKey
 immediately after a mapPartitions that transform the original RDD in a
 collection of tuple (key, value). I think that is possible to achieve the
 same result by using, for instance an array of accumulator where at each
 index an executor sums a value and the index itself could be a key.

 Since the reduceByKey will perform a shuffle on disk I think that when is
 possible, the foreach approach should be better even though the foreach has
 the side effect of sum a value to an accumulator.

 I am making this request to see if my reasoning is correct . I hope I was
 clear.
 Thank you, Beniamino



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/foreach-plus-accumulator-Vs-mapPartitions-performance-tp22982.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo

2015-05-20 Thread Burak Yavuz
Could you please open a JIRA for it? The maxBins input is missing for the
Python Api.

Is it possible if you can use the current master? In the current master,
you should be able to use trees with the Pipeline Api and DataFrames.

Best,
Burak

On Wed, May 20, 2015 at 2:44 PM, Don Drake dondr...@gmail.com wrote:

 I'm running Spark v1.3.1 and when I run the following against my dataset:

 model = GradientBoostedTrees.trainRegressor(trainingData,
 categoricalFeaturesInfo=catFeatu
 res, maxDepth=6, numIterations=3)

 The job will fail with the following message:
 Traceback (most recent call last):
   File /Users/drake/fd/spark/mltest.py, line 73, in module
 model = GradientBoostedTrees.trainRegressor(trainingData,
 categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3)
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py,
 line 553, in trainRegressor
 loss, numIterations, learningRate, maxDepth)
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py,
 line 438, in _train
 loss, numIterations, learningRate, maxDepth)
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py,
 line 120, in callMLlibFunc
 return callJavaFunc(sc, api, *args)
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py,
 line 113, in callJavaFunc
 return _java2py(sc, func(*args))
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o69.trainGradientBoostedTreesModel.
 : java.lang.IllegalArgumentException: requirement failed: DecisionTree
 requires maxBins (= 32) = max categories in categorical features (= 1895)
 at scala.Predef$.require(Predef.scala:233)
 at
 org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128)
 at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138)
 at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60)
 at
 org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150)
 at
 org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63)
 at
 org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96)
 at
 org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595)

 So, it's complaining about the maxBins, if I provide maxBins=1900 and
 re-run it:

 model = GradientBoostedTrees.trainRegressor(trainingData,
 categoricalFeaturesInfo=catFeatu
 res, maxDepth=6, numIterations=3, maxBins=1900)

 Traceback (most recent call last):
   File /Users/drake/fd/spark/mltest.py, line 73, in module
 model = GradientBoostedTrees.trainRegressor(trainingData,
 categoricalFeaturesInfo=catF
 eatures, maxDepth=6, numIterations=3, maxBins=1900)
 TypeError: trainRegressor() got an unexpected keyword argument 'maxBins'

 It now says it knows nothing of maxBins.

 If I run the same command against DecisionTree or RandomForest (with
 maxBins=1900) it works just fine.

 Seems like a bug in GradientBoostedTrees.

 Suggestions?

 -Don

 --
 Donald Drake
 Drake Consulting
 http://www.drakeconsulting.com/
 800-733-2143



Re: ReduceByKey and sorting within partitions

2015-05-04 Thread Burak Yavuz
I think this Spark Package may be what you're looking for!
http://spark-packages.org/package/tresata/spark-sorted

Best,
Burak

On Mon, May 4, 2015 at 12:56 PM, Imran Rashid iras...@cloudera.com wrote:

 oh wow, that is a really interesting observation, Marco  Jerry.
 I wonder if this is worth exposing in combineByKey()?  I think Jerry's
 proposed workaround is all you can do for now -- use reflection to
 side-step the fact that the methods you need are private.

 On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Marco,

 As I know, current combineByKey() does not expose the related argument
 where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is
 package private, if you can get the ShuffledRDD through reflection or other
 way, the keyOrdering you set will be pushed down to shuffle. If you use a
 combination of transformations to do it, the result will be same but the
 efficiency may be different, some transformations will separate into
 different stages, which will introduce additional shuffle.

 Thanks
 Jerry


 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com:

 Hi,

 I'm trying, after reducing by key, to get data ordered among partitions
 (like RangePartitioner) and within partitions (like sortByKey or
 repartitionAndSortWithinPartition) pushing the sorting down to the
 shuffles machinery of the reducing phase.

 I think, but maybe I'm wrong, that the correct way to do that is that
 combineByKey call setKeyOrdering function on the ShuflleRDD that it
 returns.

 Am I wrong? Can be done by a combination of other transformations with
 the same efficiency?

 Thanks,
 Marco

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: DataFrame filter referencing error

2015-04-30 Thread Burak Yavuz
Is new a reserved word for MySQL?

On Thu, Apr 30, 2015 at 2:41 PM, Francesco Bigarella 
francesco.bigare...@gmail.com wrote:

 Do you know how I can check that? I googled a bit but couldn't find a
 clear explanation about it. I also tried to use explain() but it doesn't
 really help.
 I still find unusual that I have this issue only for the equality operator
 but not for the others.

 Thank you,
 F

 On Wed, Apr 29, 2015 at 3:03 PM ayan guha guha.a...@gmail.com wrote:

 Looks like you DF is based on a MySQL DB using jdbc, and error is thrown
 from mySQL. Can you see what SQL is finally getting fired in MySQL? Spark
 is pushing down the predicate to mysql so its not a spark problem perse

 On Wed, Apr 29, 2015 at 9:56 PM, Francesco Bigarella 
 francesco.bigare...@gmail.com wrote:

 Hi all,

 I was testing the DataFrame filter functionality and I found what I
 think is a strange behaviour.
 My dataframe testDF, obtained loading aMySQL table via jdbc, has the
 following schema:
 root
  | -- id: long (nullable = false)
  | -- title: string (nullable = true)
  | -- value: string (nullable = false)
  | -- status: string (nullable = false)

 What I want to do is filter my dataset to obtain all rows that have a
 status = new.

 scala testDF.filter(testDF(id) === 1234).first()
 works fine (also with the integer value within double quotes), however
 if I try to use the same statement to filter on the status column (also
 with changes in the syntax - see below), suddenly the program breaks.

 Any of the following
 scala testDF.filter(testDF(status) === new)
 scala testDF.filter(status = 'new')
 scala testDF.filter($status === new)

 generates the error:

 INFO scheduler.DAGScheduler: Job 3 failed: runJob at
 SparkPlan.scala:121, took 0.277907 s

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 3.0 (TID 12, node name):
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column
 'new' in 'where clause'

 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
 at com.mysql.jdbc.Util.getInstance(Util.java:386)
 at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1052)
 at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3597)
 at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3529)
 at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1990)
 at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151)
 at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2625)
 at
 com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2119)
 at
 com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2283)
 at org.apache.spark.sql.jdbc.JDBCRDD$anon$1.init(JDBCRDD.scala:328)
 at org.apache.spark.sql.jdbc.JDBCRDD.compute(JDBCRDD.scala:309)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Does filter work only on columns of the integer type? What is the exact
 behaviour of the filter function and what is the best way to handle the
 query I am trying to execute?

 Thank you,
 Francesco




 --
 Best Regards,
 Ayan Guha




  1   2   >