RE: top-k function for Window

2017-01-03 Thread Mendelson, Assaf
Assume you have a UDAF which looks like this:

-  Input: The value

-  Buffer: K elements

-  Output: An array (which would have the K elements)

-  Init: Initialize all elements to some irrelevant value (e.g. 
int.MinValue)

-  Update: Start going over the buffer find the spot which is smaller 
than the current value then push everything forward and put it in (i.e. sorted 
insert)

-  Merge: “merge sort” between the two buffers

-  Evaluate: turn the buffer to array
Then run the UDAF on the groupby.

The result would be an array of (upto) K elements per key. To turn it back to K 
lines all you need to do is explode it.

Assuming that K is small, the calculation of the UDAF would be much faster than 
the sorting (it only needs to do sortings on very small K).

Assaf.
From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 8:03 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: top-k function for Window

> Furthermore, in your example you don’t even need a window function, you can 
> simply use groupby and explode

Can you clarify? You need to sort somehow (be it map-side sorting or 
reduce-side sorting).



---
Regards,
Andy

On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf 
> wrote:
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy



Migrate spark sql to rdd for better performance

2017-01-03 Thread geoHeil
I optimized a spark sql script but have come to the conclusion that the sql
api is not ideal as the tasks which are generated are slow and require too
much shuffling. 

So the script should be converted to rdd 
http://stackoverflow.com/q/41445571/2587904

How can I formulate this more efficient using RDD API? aggregateByKeyshould
be a good idea but is still not very clear to me how to apply it here to
substitute the window functions.

Cheers 
Georg 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Migrate-spark-sql-to-rdd-for-better-performance-tp28270.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OS killing Executor due to high (possibly off heap) memory usage

2017-01-03 Thread Koert Kuipers
it would be great if this offheap memory usage becomes more predictable
again.
currently i see users put memoryOverhead to many gigabytes, sometimes as
much as executor memory. it is trial and error to find out what the right
number is. so people dont bother and put in huge numbers instead.

On Thu, Dec 8, 2016 at 11:53 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> I did some instrumentation to figure out traces of where DirectByteBuffers
> are being created and it turns out that setting the following system
> properties in addition to setting spark.shuffle.io.preferDirectBufs=false
> in spark config:
>
> io.netty.noUnsafe=true
> io.netty.threadLocalDirectBufferSize=0
>
> This should force netty to mostly use on heap buffers and thus increases
> the stability of spark jobs that perform a lot of shuffle. I have created
> the defect SPARK-18787 to either force these settings when
> spark.shuffle.io.preferDirectBufs=false is set in spark config or
> document it.
>
> Hope it will be helpful for other users as well.
>
> Thanks,
> Aniket
>
> On Sat, Nov 26, 2016 at 3:31 PM Koert Kuipers  wrote:
>
>> i agree that offheap memory usage is unpredictable.
>>
>> when we used rdds the memory was mostly on heap and total usage
>> predictable, and we almost never had yarn killing executors.
>>
>> now with dataframes the memory usage is both on and off heap, and we have
>> no way of limiting the off heap memory usage by spark, yet yarn requires a
>> maximum total memory usage and if you go over it yarn kills the executor.
>>
>> On Fri, Nov 25, 2016 at 12:14 PM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>> Thanks Rohit, Roddick and Shreya. I tried changing 
>> spark.yarn.executor.memoryOverhead
>> to be 10GB and lowering executor memory to 30 GB and both of these didn't
>> work. I finally had to reduce the number of cores per executor to be 18
>> (from 36) in addition to setting higher spark.yarn.executor.memoryOverhead
>> and lower executor memory size. I had to trade off performance for
>> reliability.
>>
>> Unfortunately, spark does a poor job reporting off heap memory usage.
>> From the profiler, it seems that the job's heap usage is pretty static but
>> the off heap memory fluctuates quiet a lot. It looks like bulk of off heap
>> is used by io.netty.buffer.UnpooledUnsafeDirectByteBuf while the shuffle
>> client is trying to read block from shuffle service. It looks
>> like org.apache.spark.network.util.TransportFrameDecoder retains them
>> in buffers field while decoding responses from the shuffle service. So far,
>> it's not clear why it needs to hold multiple GBs in the buffers. Perhaps
>> increasing the number of partitions may help with this.
>>
>> Thanks,
>> Aniket
>>
>> On Fri, Nov 25, 2016 at 1:09 AM Shreya Agarwal 
>> wrote:
>>
>> I don’t think it’s just memory overhead. It might be better to use an
>> execute with lesser heap space(30GB?). 46 GB would mean more data load into
>> memory and more GC, which can cause issues.
>>
>>
>>
>> Also, have you tried to persist data in any way? If so, then that might
>> be causing an issue.
>>
>>
>>
>> Lastly, I am not sure if your data has a skew and if that is forcing a
>> lot of data to be on one executor node.
>>
>>
>>
>> Sent from my Windows 10 phone
>>
>>
>>
>> *From: *Rodrick Brown 
>> *Sent: *Friday, November 25, 2016 12:25 AM
>> *To: *Aniket Bhatnagar 
>> *Cc: *user 
>> *Subject: *Re: OS killing Executor due to high (possibly off heap)
>> memory usage
>>
>>
>> Try setting spark.yarn.executor.memoryOverhead 1
>>
>> On Thu, Nov 24, 2016 at 11:16 AM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>> Hi Spark users
>>
>> I am running a job that does join of a huge dataset (7 TB+) and the
>> executors keep crashing randomly, eventually causing the job to crash.
>> There are no out of memory exceptions in the log and looking at the dmesg
>> output, it seems like the OS killed the JVM because of high memory usage.
>> My suspicion is towards off heap usage of executor is causing this as I am
>> limiting the on heap usage of executor to be 46 GB and each host running
>> the executor has 60 GB of RAM. After the executor crashes, I can see that
>> the external shuffle manager 
>> (org.apache.spark.network.server.TransportRequestHandler)
>> logs a lot of channel closed exceptions in yarn node manager logs. This
>> leads me to believe that something triggers out of memory during shuffle
>> read. Is there a configuration to completely disable usage of off heap
>> memory? I have tried setting spark.shuffle.io.preferDirectBufs=false but
>> the executor is still getting killed by the same error.
>>
>> Cluster details:
>> 10 AWS c4.8xlarge hosts
>> RAM on each host - 60 GB
>> Number of cores on each host - 36
>> Additional hard disk on each host - 8 TB
>>
>> Spark configuration:
>> dynamic allocation enabled

Dynamic scheduling not respecting spark.executor.cores

2017-01-03 Thread Nirav Patel
When enabling dynamic scheduling I see that all executors are using only 1
core even if I specify "spark.executor.cores" to 6. If dynamic scheduling
is disable then each executors will have 6 cores. I have tested this
against spark 1.5 . I wonder if this is the same behavior with 2.x as well.

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Spark test error

2017-01-03 Thread Yanwei Wayne Zhang
I tried to run the tests in 'GeneralizedLinearRegressionSuite', and all tests 
passed except for test("read/write") which yielded the following error message. 
Any suggestion on why this happened and how to fix it? Thanks. BTW, I ran the 
test in IntelliJ.


The default jsonEncode only supports string and vector. 
org.apache.spark.ml.param.Param must override jsonEncode for java.lang.Double.
scala.NotImplementedError: The default jsonEncode only supports string and 
vector. org.apache.spark.ml.param.Param must override jsonEncode for 
java.lang.Double.
at org.apache.spark.ml.param.Param.jsonEncode(params.scala:98)
at 
org.apache.spark.ml.util.DefaultParamsWriter$$anonfun$1$$anonfun$2.apply(ReadWrite.scala:293)
at 
org.apache.spark.ml.util.DefaultParamsWriter$$anonfun$1$$anonfun$2.apply(ReadWrite.scala:292)


Regards,
Wayne


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Ayan,

This "inline view" idea is really awesome and enlightens me! Finally I have
a plan to move on. I greatly appreciate your help!

Best regards,
Yang

2017-01-03 18:14 GMT+01:00 ayan guha :

> Ahh I see what you meanI confused two terminologiesbecause we were
> talking about partitioning and then changed topic to identify changed data
> 
>
> For that, you can "construct" a dbtable as an inline view -
>
> viewSQL = "(select * from table where  > '')".
> replace("","inserted_on").replace(" value>",checkPointedValue)
> dbtable =viewSQL
>
> refer to this
> 
> blog...
>
> So, in summary, you have 2 things
>
> 1. Identify changed data - my suggestion to use dbtable with inline view
> 2. parallelism - use numPartition,lowerbound,upper bound to generate
> number of partitions
>
> HTH
>
>
>
> On Wed, Jan 4, 2017 at 3:46 AM, Yuanzhe Yang  wrote:
>
>> Hi Ayan,
>>
>> Yeah, I understand your proposal, but according to here
>> http://spark.apache.org/docs/latest/sql-programming-gui
>> de.html#jdbc-to-other-databases, it says
>>
>> Notice that lowerBound and upperBound are just used to decide the
>> partition stride, not for filtering the rows in table. So all rows in the
>> table will be partitioned and returned. This option applies only to reading.
>>
>> So my interpretation is all rows in the table are ingested, and this
>> "lowerBound" and "upperBound" is the span of each partition. Well, I am not
>> a native English speaker, maybe it means differently?
>>
>> Best regards,
>> Yang
>>
>> 2017-01-03 17:23 GMT+01:00 ayan guha :
>>
>>> Hi
>>>
>>> You need to store and capture the Max of the column you intend to use
>>> for identifying new records (Ex: INSERTED_ON) after every successful run of
>>> your job. Then, use the value in lowerBound option.
>>>
>>> Essentially, you want to create a query like
>>>
>>> select * from table where INSERTED_ON > lowerBound and
>>> INSERTED_ON>>
>>> everytime you run the job
>>>
>>>
>>>
>>> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang  wrote:
>>>
 Hi Ayan,

 Thanks a lot for your suggestion. I am currently looking into sqoop.

 Concerning your suggestion for Spark, it is indeed parallelized with
 multiple workers, but the job is one-off and cannot keep streaming.
 Moreover, I cannot specify any "start row" in the job, it will always
 ingest the entire table. So I also cannot simulate a streaming process by
 starting the job in fix intervals...

 Best regards,
 Yang

 2017-01-03 15:06 GMT+01:00 ayan guha :

> Hi
>
> While the solutions provided by others looks promising and I'd like to
> try out few of them, our old pal sqoop already "does" the job. It has a
> incremental mode where you can provide a --check-column and
> --last-modified-value combination to grab the data - and yes, sqoop
> essentially does it by running a MAP-only job which spawns number of
> parallel map task to grab data from DB.
>
> In Spark, you can use sqlContext.load function for JDBC and use
> partitionColumn and numPartition to define parallelism of connection.
>
> Best
> Ayan
>
> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang 
> wrote:
>
>> Hi Ayan,
>>
>> Thanks a lot for such a detailed response. I really appreciate it!
>>
>> I think this use case can be generalized, because the data is
>> immutable and append-only. We only need to find one column or timestamp 
>> to
>> track the last row consumed in the previous ingestion. This pattern 
>> should
>> be common when storing sensor data. If the data is mutable, then the
>> solution will be surely difficult and vendor specific as you said.
>>
>> The workflow you proposed is very useful. The difficulty part is how
>> to parallelize the ingestion task. With Spark when I have multiple 
>> workers
>> working on the same job, I don't know if there is a way and how to
>> dynamically change the row range each worker should process in 
>> realtime...
>>
>> I tried to find out if there is any candidate available out of the
>> box, instead of reinventing the wheel. At this moment I have not 
>> discovered
>> any existing tool can parallelize ingestion tasks on one database. Is 
>> Sqoop
>> a proper candidate from your knowledge?
>>
>> Thank you again and have a nice day.
>>
>> Best regards,
>> Yang
>>
>>
>>
>> 2016-12-30 8:28 GMT+01:00 ayan guha :
>>
>>>
>>>
>>> "If data ingestion speed is faster than data production speed, then
>>> eventually the entire database will be harvested and those workers will
>>> start to "tail" the 

Re: Apache Hive with Spark Configuration

2017-01-03 Thread Ryan Blue
Chetan,

Spark is currently using Hive 1.2.1 to interact with the Metastore. Using
that version for Hive is going to be the most reliable, but the metastore
API doesn't change very often and we've found (from having different
versions as well) that older versions are mostly compatible. Some things
fail occasionally, but we haven't had too many problems running different
versions with the same metastore in practice.

rb

On Wed, Dec 28, 2016 at 4:22 AM, Chetan Khatri 
wrote:

> Hello Users / Developers,
>
> I am using Hive 2.0.1 with MySql as a Metastore, can you tell me which
> version is more compatible with Spark 2.0.2 ?
>
> THanks
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: top-k function for Window

2017-01-03 Thread Koert Kuipers
i dont know anything about windowing or about not using developer apis...

but

but a trivial implementation of top-k requires a total sort per group. this
can be done with dataset. we do this using spark-sorted (
https://github.com/tresata/spark-sorted) but its not hard to do it yourself
for datasets either. for rdds its actually a little harder i think (if you
want to avoid in memory assumption, which i assume you do)..

a perhaps more efficient implementation uses an aggregator. it is not hard
to adapt algebirds topk aggregator (spacesaver) to use as a spark
aggregator. this requires a simple adapter class. we do this in-house as
well. although i have to say i would recommend spark 2.1.0 for this. spark
2.0.x aggregator codegen is too buggy in my experience.

On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:

> Hi Austin,
>
> It's trivial to implement top-k in the RDD world - however I would like to
> stay in the Dataset API world instead of flip-flopping between the two APIs
> (consistency, wholestage codegen etc).
>
> The twitter library appears to support only RDD, and the solution you gave
> me is very similar to what I did - it doesn't work very well with skewed
> dataset :) (it has to perform the sort to work out the row number).
>
> I've been toying with the UDAF idea, but the more I write the code the
> more I see myself digging deeper into the developer API land  - not very
> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
> gets messy really fast.
>
> ---
> Regards,
> Andy
>
> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>
>> Andy,
>>
>>
>>
>> You might want to also checkout the Algebird libraries from Twitter. They
>> have topK and a lot of other helpful functions. I’ve used the Algebird topk
>> successfully on very large data sets.
>>
>>
>>
>> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
>> scrupulous you are about your TopKs (I can expound on this, if needed).
>>
>>
>>
>> I obfuscated the field names, before pasting this into email – I think I
>> got them all consistently.
>>
>>
>>
>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>
>>
>>
>> SELECT time_bucket,
>>
>>identifier1,
>>
>>identifier2,
>>
>>incomingCount
>>
>>   FROM (select time_bucket,
>>
>> identifier1,
>>
>> identifier2,
>>
>> incomingCount,
>>
>>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>
>>identifier1
>>
>>   ORDER BY count DESC) as rowNum
>>
>>   FROM tablename) tmp
>>
>>   WHERE rowNum <=4
>>
>>   ORDER BY time_bucket, identifier1, rowNum
>>
>>
>>
>> The count and order by:
>>
>>
>>
>>
>>
>> SELECT time_bucket,
>>
>>identifier1,
>>
>>identifier2,
>>
>>count(identifier2) as myCount
>>
>>   FROM table
>>
>>   GROUP BY time_bucket,
>>
>>identifier1,
>>
>>identifier2
>>
>>   ORDER BY time_bucket,
>>
>>identifier1,
>>
>>count(identifier2) DESC
>>
>>
>>
>>
>>
>> *From: *Andy Dang 
>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>> *To: *user 
>> *Subject: *top-k function for Window
>>
>>
>>
>> Hi all,
>>
>>
>>
>> What's the best way to do top-k with Windowing in Dataset world?
>>
>>
>>
>> I have a snippet of code that filters the data to the top-k, but with
>> skewed keys:
>>
>>
>>
>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>>
>> val rank = row_number().over(windowSpec)
>>
>>
>>
>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>>
>>
>>
>> The problem with this code is that Spark doesn't know that it can sort
>> the data locally, get the local rank first. What it ends up doing is
>> performing a sort by key using the skewed keys, and this blew up the
>> cluster since the keys are heavily skewed.
>>
>>
>>
>> In the RDD world we can do something like:
>>
>> rdd.mapPartitioins(iterator -> topK(iterator))
>>
>> but I can't really think of an obvious to do this in the Dataset API,
>> especially with Window function. I guess some UserAggregateFunction would
>> do, but I wonder if there's obvious way that I missed.
>>
>>
>>
>> ---
>> Regards,
>> Andy
>>
>
>


Re: [Spark Kafka] How to update batch size of input dynamically for spark kafka consumer?

2017-01-03 Thread Cody Koeninger
You can't change the batch time, but you can limit the number of items
in the batch

http://spark.apache.org/docs/latest/configuration.html

spark.streaming.backpressure.enabled

spark.streaming.kafka.maxRatePerPartition

On Tue, Jan 3, 2017 at 4:00 AM, 周家帅  wrote:
> Hi,
>
> I am an intermediate spark user and have some experience in large data
> processing. I post this question in StackOverflow but receive no response.
> My problem is as follows:
>
> I use createDirectStream in my spark streaming application. I set the batch
> interval to 7 seconds and most of the time the batch job can finish within
> about 5 seconds. However, in very rare cases, the batch job need cost 60
> seconds and this will delay some batches of jobs. To cut down the total
> delay time for these batches, I hope I can process more streaming data which
> spread over the delayed jobs at one time. This will help the streaming
> return to normal as soon as possible.
>
> So, I want to know there is some method to dynamically update/merge batch
> size of input for spark and kafka when delay appears.
>
> Many thanks for your help.
>
>
> --
> Jiashuai Zhou
>
> School of Electronics Engineering and Computer Science,
> Peking University
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: top-k function for Window

2017-01-03 Thread Andy Dang
Hi Austin,

It's trivial to implement top-k in the RDD world - however I would like to
stay in the Dataset API world instead of flip-flopping between the two APIs
(consistency, wholestage codegen etc).

The twitter library appears to support only RDD, and the solution you gave
me is very similar to what I did - it doesn't work very well with skewed
dataset :) (it has to perform the sort to work out the row number).

I've been toying with the UDAF idea, but the more I write the code the more
I see myself digging deeper into the developer API land  - not very ideal
to be honest. Also, UDAF doesn't have any concept of sorting, so it gets
messy really fast.

---
Regards,
Andy

On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:

> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>incomingCount
>
>   FROM (select time_bucket,
>
> identifier1,
>
> identifier2,
>
> incomingCount,
>
>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>identifier1
>
>   ORDER BY count DESC) as rowNum
>
>   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>identifier1,
>
>identifier2
>
>   ORDER BY time_bucket,
>
>identifier1,
>
>count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang 
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user 
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> ---
> Regards,
> Andy
>


Re: top-k function for Window

2017-01-03 Thread HENSLEE, AUSTIN L
Andy,

You might want to also checkout the Algebird libraries from Twitter. They have 
topK and a lot of other helpful functions. I’ve used the Algebird topk 
successfully on very large data sets.

You can also use Spark SQL to do a “poor man’s” topK. This depends on how 
scrupulous you are about your TopKs (I can expound on this, if needed).

I obfuscated the field names, before pasting this into email – I think I got 
them all consistently.

Here’s the meat of the TopK part (found on SO, but I don’t have a reference) – 
this one takes the top 4, hence “rowNum <= 4”:

SELECT time_bucket,
   identifier1,
   identifier2,
   incomingCount
  FROM (select time_bucket,
identifier1,
identifier2,
incomingCount,
   ROW_NUMBER() OVER (PARTITION BY time_bucket,
   identifier1
  ORDER BY count DESC) as rowNum
  FROM tablename) tmp
  WHERE rowNum <=4
  ORDER BY time_bucket, identifier1, rowNum

The count and order by:


SELECT time_bucket,
   identifier1,
   identifier2,
   count(identifier2) as myCount
  FROM table
  GROUP BY time_bucket,
   identifier1,
   identifier2
  ORDER BY time_bucket,
   identifier1,
   count(identifier2) DESC


From: Andy Dang 
Date: Tuesday, January 3, 2017 at 7:06 AM
To: user 
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread ayan guha
Ahh I see what you meanI confused two terminologiesbecause we were
talking about partitioning and then changed topic to identify changed data


For that, you can "construct" a dbtable as an inline view -

viewSQL = "(select * from table where  >
'')".replace("","inserted_on").replace("",checkPointedValue)
dbtable =viewSQL

refer to this

blog...

So, in summary, you have 2 things

1. Identify changed data - my suggestion to use dbtable with inline view
2. parallelism - use numPartition,lowerbound,upper bound to generate number
of partitions

HTH



On Wed, Jan 4, 2017 at 3:46 AM, Yuanzhe Yang  wrote:

> Hi Ayan,
>
> Yeah, I understand your proposal, but according to here
> http://spark.apache.org/docs/latest/sql-programming-
> guide.html#jdbc-to-other-databases, it says
>
> Notice that lowerBound and upperBound are just used to decide the
> partition stride, not for filtering the rows in table. So all rows in the
> table will be partitioned and returned. This option applies only to reading.
>
> So my interpretation is all rows in the table are ingested, and this
> "lowerBound" and "upperBound" is the span of each partition. Well, I am not
> a native English speaker, maybe it means differently?
>
> Best regards,
> Yang
>
> 2017-01-03 17:23 GMT+01:00 ayan guha :
>
>> Hi
>>
>> You need to store and capture the Max of the column you intend to use for
>> identifying new records (Ex: INSERTED_ON) after every successful run of
>> your job. Then, use the value in lowerBound option.
>>
>> Essentially, you want to create a query like
>>
>> select * from table where INSERTED_ON > lowerBound and
>> INSERTED_ON>
>> everytime you run the job
>>
>>
>>
>> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang  wrote:
>>
>>> Hi Ayan,
>>>
>>> Thanks a lot for your suggestion. I am currently looking into sqoop.
>>>
>>> Concerning your suggestion for Spark, it is indeed parallelized with
>>> multiple workers, but the job is one-off and cannot keep streaming.
>>> Moreover, I cannot specify any "start row" in the job, it will always
>>> ingest the entire table. So I also cannot simulate a streaming process by
>>> starting the job in fix intervals...
>>>
>>> Best regards,
>>> Yang
>>>
>>> 2017-01-03 15:06 GMT+01:00 ayan guha :
>>>
 Hi

 While the solutions provided by others looks promising and I'd like to
 try out few of them, our old pal sqoop already "does" the job. It has a
 incremental mode where you can provide a --check-column and
 --last-modified-value combination to grab the data - and yes, sqoop
 essentially does it by running a MAP-only job which spawns number of
 parallel map task to grab data from DB.

 In Spark, you can use sqlContext.load function for JDBC and use
 partitionColumn and numPartition to define parallelism of connection.

 Best
 Ayan

 On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang 
 wrote:

> Hi Ayan,
>
> Thanks a lot for such a detailed response. I really appreciate it!
>
> I think this use case can be generalized, because the data is
> immutable and append-only. We only need to find one column or timestamp to
> track the last row consumed in the previous ingestion. This pattern should
> be common when storing sensor data. If the data is mutable, then the
> solution will be surely difficult and vendor specific as you said.
>
> The workflow you proposed is very useful. The difficulty part is how
> to parallelize the ingestion task. With Spark when I have multiple workers
> working on the same job, I don't know if there is a way and how to
> dynamically change the row range each worker should process in realtime...
>
> I tried to find out if there is any candidate available out of the
> box, instead of reinventing the wheel. At this moment I have not 
> discovered
> any existing tool can parallelize ingestion tasks on one database. Is 
> Sqoop
> a proper candidate from your knowledge?
>
> Thank you again and have a nice day.
>
> Best regards,
> Yang
>
>
>
> 2016-12-30 8:28 GMT+01:00 ayan guha :
>
>>
>>
>> "If data ingestion speed is faster than data production speed, then
>> eventually the entire database will be harvested and those workers will
>> start to "tail" the database for new data streams and the processing
>> becomes real time."
>>
>> This part is really database dependent. So it will be hard to
>> generalize it. For example, say you have a batch interval of 10
>> secswhat happens if you get more than one updates on the same row
>> within 10 secs? You will get a snapshot of every 10 secs. Now, different
>> databases provide 

Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Ayan,

Yeah, I understand your proposal, but according to here
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases,
it says

Notice that lowerBound and upperBound are just used to decide the partition
stride, not for filtering the rows in table. So all rows in the table will
be partitioned and returned. This option applies only to reading.

So my interpretation is all rows in the table are ingested, and this
"lowerBound" and "upperBound" is the span of each partition. Well, I am not
a native English speaker, maybe it means differently?

Best regards,
Yang

2017-01-03 17:23 GMT+01:00 ayan guha :

> Hi
>
> You need to store and capture the Max of the column you intend to use for
> identifying new records (Ex: INSERTED_ON) after every successful run of
> your job. Then, use the value in lowerBound option.
>
> Essentially, you want to create a query like
>
> select * from table where INSERTED_ON > lowerBound and
> INSERTED_ON
> everytime you run the job
>
>
>
> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang  wrote:
>
>> Hi Ayan,
>>
>> Thanks a lot for your suggestion. I am currently looking into sqoop.
>>
>> Concerning your suggestion for Spark, it is indeed parallelized with
>> multiple workers, but the job is one-off and cannot keep streaming.
>> Moreover, I cannot specify any "start row" in the job, it will always
>> ingest the entire table. So I also cannot simulate a streaming process by
>> starting the job in fix intervals...
>>
>> Best regards,
>> Yang
>>
>> 2017-01-03 15:06 GMT+01:00 ayan guha :
>>
>>> Hi
>>>
>>> While the solutions provided by others looks promising and I'd like to
>>> try out few of them, our old pal sqoop already "does" the job. It has a
>>> incremental mode where you can provide a --check-column and
>>> --last-modified-value combination to grab the data - and yes, sqoop
>>> essentially does it by running a MAP-only job which spawns number of
>>> parallel map task to grab data from DB.
>>>
>>> In Spark, you can use sqlContext.load function for JDBC and use
>>> partitionColumn and numPartition to define parallelism of connection.
>>>
>>> Best
>>> Ayan
>>>
>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang  wrote:
>>>
 Hi Ayan,

 Thanks a lot for such a detailed response. I really appreciate it!

 I think this use case can be generalized, because the data is immutable
 and append-only. We only need to find one column or timestamp to track the
 last row consumed in the previous ingestion. This pattern should be common
 when storing sensor data. If the data is mutable, then the solution will be
 surely difficult and vendor specific as you said.

 The workflow you proposed is very useful. The difficulty part is how to
 parallelize the ingestion task. With Spark when I have multiple workers
 working on the same job, I don't know if there is a way and how to
 dynamically change the row range each worker should process in realtime...

 I tried to find out if there is any candidate available out of the box,
 instead of reinventing the wheel. At this moment I have not discovered any
 existing tool can parallelize ingestion tasks on one database. Is Sqoop a
 proper candidate from your knowledge?

 Thank you again and have a nice day.

 Best regards,
 Yang



 2016-12-30 8:28 GMT+01:00 ayan guha :

>
>
> "If data ingestion speed is faster than data production speed, then
> eventually the entire database will be harvested and those workers will
> start to "tail" the database for new data streams and the processing
> becomes real time."
>
> This part is really database dependent. So it will be hard to
> generalize it. For example, say you have a batch interval of 10
> secswhat happens if you get more than one updates on the same row
> within 10 secs? You will get a snapshot of every 10 secs. Now, different
> databases provide different mechanisms to expose all DML changes, MySQL 
> has
> binlogs, oracle has log shipping, cdc,golden gate and so ontypically 
> it
> requires new product or new licenses and most likely new component
> installation on production db :)
>
> So, if we keep real CDC solutions out of scope, a simple snapshot
> solution can be achieved fairly easily by
>
> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
> 3. Running an extraction/load mechanism which will take data from DB
> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This 
> can
> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
> ETL 

Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread ayan guha
Hi

You need to store and capture the Max of the column you intend to use for
identifying new records (Ex: INSERTED_ON) after every successful run of
your job. Then, use the value in lowerBound option.

Essentially, you want to create a query like

select * from table where INSERTED_ON > lowerBound and
INSERTED_ON wrote:

> Hi Ayan,
>
> Thanks a lot for your suggestion. I am currently looking into sqoop.
>
> Concerning your suggestion for Spark, it is indeed parallelized with
> multiple workers, but the job is one-off and cannot keep streaming.
> Moreover, I cannot specify any "start row" in the job, it will always
> ingest the entire table. So I also cannot simulate a streaming process by
> starting the job in fix intervals...
>
> Best regards,
> Yang
>
> 2017-01-03 15:06 GMT+01:00 ayan guha :
>
>> Hi
>>
>> While the solutions provided by others looks promising and I'd like to
>> try out few of them, our old pal sqoop already "does" the job. It has a
>> incremental mode where you can provide a --check-column and
>> --last-modified-value combination to grab the data - and yes, sqoop
>> essentially does it by running a MAP-only job which spawns number of
>> parallel map task to grab data from DB.
>>
>> In Spark, you can use sqlContext.load function for JDBC and use
>> partitionColumn and numPartition to define parallelism of connection.
>>
>> Best
>> Ayan
>>
>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang  wrote:
>>
>>> Hi Ayan,
>>>
>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>
>>> I think this use case can be generalized, because the data is immutable
>>> and append-only. We only need to find one column or timestamp to track the
>>> last row consumed in the previous ingestion. This pattern should be common
>>> when storing sensor data. If the data is mutable, then the solution will be
>>> surely difficult and vendor specific as you said.
>>>
>>> The workflow you proposed is very useful. The difficulty part is how to
>>> parallelize the ingestion task. With Spark when I have multiple workers
>>> working on the same job, I don't know if there is a way and how to
>>> dynamically change the row range each worker should process in realtime...
>>>
>>> I tried to find out if there is any candidate available out of the box,
>>> instead of reinventing the wheel. At this moment I have not discovered any
>>> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
>>> proper candidate from your knowledge?
>>>
>>> Thank you again and have a nice day.
>>>
>>> Best regards,
>>> Yang
>>>
>>>
>>>
>>> 2016-12-30 8:28 GMT+01:00 ayan guha :
>>>


 "If data ingestion speed is faster than data production speed, then
 eventually the entire database will be harvested and those workers will
 start to "tail" the database for new data streams and the processing
 becomes real time."

 This part is really database dependent. So it will be hard to
 generalize it. For example, say you have a batch interval of 10
 secswhat happens if you get more than one updates on the same row
 within 10 secs? You will get a snapshot of every 10 secs. Now, different
 databases provide different mechanisms to expose all DML changes, MySQL has
 binlogs, oracle has log shipping, cdc,golden gate and so ontypically it
 requires new product or new licenses and most likely new component
 installation on production db :)

 So, if we keep real CDC solutions out of scope, a simple snapshot
 solution can be achieved fairly easily by

 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
 3. Running an extraction/load mechanism which will take data from DB
 (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
 be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
 directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
 ETL tools would too...
 4. Finally, update check point...

 You may "determine" checkpoint from the data you already have in HDFS
 if you create a Hive structure on it.

 Best
 AYan



 On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪  wrote:

> why not sync binlog of mysql(hopefully the data is immutable and the
> table is append-only), send the log through kafka and then consume it by
> spark streaming?
>
> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> We don't support this yet, but I've opened this JIRA as it sounds
>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>
>> In the mean time you could try implementing your own Source, but that
>> is pretty low level and is not yet a stable API.
>>
>> On Thu, Dec 

Mysql table upsate in spark

2017-01-03 Thread Santlal J Gupta
Hi,

I am new to spark and scala development. I want to update Mysql table using 
spark for my poc.
Scenario :
Mysql table myCity:
namecity
1  
2  

I want to update this table with below values :

namecity
1  AA
2  BB

After update myCity table should contain data as :

namecity
1  AA
2  BB

I am using spark version 2.0.2.
Is there any best practice to achieve mysql table update in Spark (using Spark 
api)?
 Regards,
Santlal Gupta
**Disclaimer**
 This e-mail message and any attachments may contain confidential information 
and is for the sole use of the intended recipient(s) only. Any views or 
opinions presented or implied are solely those of the author and do not 
necessarily represent the views of BitWise. If you are not the intended 
recipient(s), you are hereby notified that disclosure, printing, copying, 
forwarding, distribution, or the taking of any action whatsoever in reliance on 
the contents of this electronic information is strictly prohibited. If you have 
received this e-mail message in error, please immediately notify the sender and 
delete the electronic message and any attachments.BitWise does not accept 
liability for any virus introduced by this e-mail or any attachments. 



RBackendHandler Error while running ML algorithms with SparkR on RStudio

2017-01-03 Thread Md. Rezaul Karim
Dear Spark Users,

I was trying to execute RandomForest and NaiveBayes algorithms on RStudio
but experiencing the following error:

17/01/03 15:04:11 ERROR RBackendHandler: fit on
org.apache.spark.ml.r.NaiveBayesWrapper
failed
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.
java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.api.r.RBackendHandler.handleMethodCall(
RBackendHandler.scala:141)
Caused by: java.io.IOException: Class not found
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.(Unknown Source)
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
  java.io.IOException: Class not found

Here's my source code:

Sys.setenv(SPARK_HOME = "spark-2.1.0-bin-hadoop2.7")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

library(SparkR)
sparkR.session(appName = "SparkR-NB", master = "local[*]", sparkConfig =
list(spark.driver.memory = "2g"))

# Fit a Bernoulli naive Bayes model with spark.naiveBayes
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
nbDF <- titanicDF
nbTestDF <- titanicDF
nbModel <- spark.naiveBayes(nbDF, Survived ~ Class + Sex + Age)

# Model summary
summary(nbModel)

# Prediction
nbPredictions <- predict(nbModel, nbTestDF)
showDF(nbPredictions)




Someone please help me to get rid of this error.


Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html



Storage history in web UI

2017-01-03 Thread Joseph Naegele
Hi all,

Is there any way to observe Storage history in Spark, i.e. which RDDs were 
cached and where, etc. after an application completes? It appears the Storage 
tab in the History Server UI is useless.

Thanks
---
Joe Naegele
Grier Forensics



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Ayan,

Thanks a lot for your suggestion. I am currently looking into sqoop.

Concerning your suggestion for Spark, it is indeed parallelized with
multiple workers, but the job is one-off and cannot keep streaming.
Moreover, I cannot specify any "start row" in the job, it will always
ingest the entire table. So I also cannot simulate a streaming process by
starting the job in fix intervals...

Best regards,
Yang

2017-01-03 15:06 GMT+01:00 ayan guha :

> Hi
>
> While the solutions provided by others looks promising and I'd like to try
> out few of them, our old pal sqoop already "does" the job. It has a
> incremental mode where you can provide a --check-column and
> --last-modified-value combination to grab the data - and yes, sqoop
> essentially does it by running a MAP-only job which spawns number of
> parallel map task to grab data from DB.
>
> In Spark, you can use sqlContext.load function for JDBC and use
> partitionColumn and numPartition to define parallelism of connection.
>
> Best
> Ayan
>
> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang  wrote:
>
>> Hi Ayan,
>>
>> Thanks a lot for such a detailed response. I really appreciate it!
>>
>> I think this use case can be generalized, because the data is immutable
>> and append-only. We only need to find one column or timestamp to track the
>> last row consumed in the previous ingestion. This pattern should be common
>> when storing sensor data. If the data is mutable, then the solution will be
>> surely difficult and vendor specific as you said.
>>
>> The workflow you proposed is very useful. The difficulty part is how to
>> parallelize the ingestion task. With Spark when I have multiple workers
>> working on the same job, I don't know if there is a way and how to
>> dynamically change the row range each worker should process in realtime...
>>
>> I tried to find out if there is any candidate available out of the box,
>> instead of reinventing the wheel. At this moment I have not discovered any
>> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
>> proper candidate from your knowledge?
>>
>> Thank you again and have a nice day.
>>
>> Best regards,
>> Yang
>>
>>
>>
>> 2016-12-30 8:28 GMT+01:00 ayan guha :
>>
>>>
>>>
>>> "If data ingestion speed is faster than data production speed, then
>>> eventually the entire database will be harvested and those workers will
>>> start to "tail" the database for new data streams and the processing
>>> becomes real time."
>>>
>>> This part is really database dependent. So it will be hard to generalize
>>> it. For example, say you have a batch interval of 10 secswhat happens
>>> if you get more than one updates on the same row within 10 secs? You will
>>> get a snapshot of every 10 secs. Now, different databases provide different
>>> mechanisms to expose all DML changes, MySQL has binlogs, oracle has log
>>> shipping, cdc,golden gate and so ontypically it requires new product or
>>> new licenses and most likely new component installation on production db :)
>>>
>>> So, if we keep real CDC solutions out of scope, a simple snapshot
>>> solution can be achieved fairly easily by
>>>
>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>>> 3. Running an extraction/load mechanism which will take data from DB
>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
>>> ETL tools would too...
>>> 4. Finally, update check point...
>>>
>>> You may "determine" checkpoint from the data you already have in HDFS if
>>> you create a Hive structure on it.
>>>
>>> Best
>>> AYan
>>>
>>>
>>>
>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪  wrote:
>>>
 why not sync binlog of mysql(hopefully the data is immutable and the
 table is append-only), send the log through kafka and then consume it by
 spark streaming?

 On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
 mich...@databricks.com> wrote:

> We don't support this yet, but I've opened this JIRA as it sounds
> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>
> In the mean time you could try implementing your own Source, but that
> is pretty low level and is not yet a stable API.
>
> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
> yyz1...@gmail.com> wrote:
>
>> Hi all,
>>
>> Thanks a lot for your contributions to bring us new technologies.
>>
>> I don't want to waste your time, so before I write to you, I googled,
>> checked stackoverflow and mailing list archive with keywords "streaming"
>> and "jdbc". But I was not able to get any solution to my use case. I 
>> hope I
>> can get some 

Re: can UDF accept "Any"/"AnyVal"/"AnyRef"(java Object) as parameter or as return type ?

2017-01-03 Thread Koert Kuipers
spark sql is "runtime strongly typed" meaning it must know the actual type.
so this will not work

On Jan 3, 2017 07:46, "Linyuxin"  wrote:

> Hi all
>
> *With Spark 1.5.1*
>
>
>
> *When I want to implement a oracle decode function (like
> decode(col1,1,’xxx’,’p2’,’yyy’,0))*
>
>
>
> *And the code may like this*
>
> sqlContext.udf.register("any_test", (s:AnyVal)=>{
>
>   if(s == null)
>
> null
>
>   else
>
> s
>
> })
>
>
>
>
>
> *The error shows:*
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Schema for type Any is not supported
>
>  at org.apache.spark.sql.catalyst.ScalaReflection$class.
> schemaFor(ScalaReflection.scala:153)
>
>  at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
> ScalaReflection.scala:29)
>
>  at org.apache.spark.sql.catalyst.ScalaReflection$class.
> schemaFor(ScalaReflection.scala:64)
>
>  at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
> ScalaReflection.scala:29)
>
>  at org.apache.spark.sql.UDFRegistration.register(
> UDFRegistration.scala:145)
>
>  …
>
>
>
> *any suggestion?*
>


Re: Broadcast Join and Inner Join giving different result on same DataFrame

2017-01-03 Thread ayan guha
I think productBroadcastDF is broadcast variable in your case, not the DF
itself. Try the join with productBroadcastDF.value

On Wed, Jan 4, 2017 at 1:04 AM, Patrick  wrote:

> Hi,
>
> An Update on above question: In Local[*] mode code is working fine. The
> Broadcast size is 200MB, but on Yarn it the broadcast join is giving empty
> result.But in Sql Query in UI, it does show BroadcastHint.
>
> Thanks
>
>
> On Fri, Dec 30, 2016 at 9:15 PM, titli batali 
> wrote:
>
>> Hi,
>>
>> I have two dataframes which has common column Product_Id on which i have
>> to perform a join operation.
>>
>> val transactionDF = readCSVToDataFrame(sqlCtx: SQLContext,
>> pathToReadTransactions: String, transactionSchema: StructType)
>> val productDF = readCSVToDataFrame(sqlCtx: SQLContext,
>> pathToReadProduct:String, productSchema: StructType)
>>
>> As, transaction data is very large but product data is small, i would
>> ideally do a  broadcast join where i braodcast productDF.
>>
>>  val productBroadcastDF =  broadcast(productDF)
>>  val broadcastJoin = transcationDF.join(productBroadcastDF,
>> "productId")
>>
>> Or simply,  val innerJoin = transcationDF.join(productDF, "productId")
>> should give the same result as above.
>>
>> But If i join using simple inner join i get  dataframe  with joined
>> values whereas if i do broadcast join i get empty dataframe with empty
>> values. I am not able to explain this behavior. Ideally both should give
>> the same result.
>>
>> What could have gone wrong. Any one faced the similar issue?
>>
>>
>> Thanks,
>> Prateek
>>
>>
>>
>>
>>
>
>


-- 
Best Regards,
Ayan Guha


RE: top-k function for Window

2017-01-03 Thread Mendelson, Assaf
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread ayan guha
Hi

While the solutions provided by others looks promising and I'd like to try
out few of them, our old pal sqoop already "does" the job. It has a
incremental mode where you can provide a --check-column and
--last-modified-value combination to grab the data - and yes, sqoop
essentially does it by running a MAP-only job which spawns number of
parallel map task to grab data from DB.

In Spark, you can use sqlContext.load function for JDBC and use
partitionColumn and numPartition to define parallelism of connection.

Best
Ayan

On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang  wrote:

> Hi Ayan,
>
> Thanks a lot for such a detailed response. I really appreciate it!
>
> I think this use case can be generalized, because the data is immutable
> and append-only. We only need to find one column or timestamp to track the
> last row consumed in the previous ingestion. This pattern should be common
> when storing sensor data. If the data is mutable, then the solution will be
> surely difficult and vendor specific as you said.
>
> The workflow you proposed is very useful. The difficulty part is how to
> parallelize the ingestion task. With Spark when I have multiple workers
> working on the same job, I don't know if there is a way and how to
> dynamically change the row range each worker should process in realtime...
>
> I tried to find out if there is any candidate available out of the box,
> instead of reinventing the wheel. At this moment I have not discovered any
> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
> proper candidate from your knowledge?
>
> Thank you again and have a nice day.
>
> Best regards,
> Yang
>
>
>
> 2016-12-30 8:28 GMT+01:00 ayan guha :
>
>>
>>
>> "If data ingestion speed is faster than data production speed, then
>> eventually the entire database will be harvested and those workers will
>> start to "tail" the database for new data streams and the processing
>> becomes real time."
>>
>> This part is really database dependent. So it will be hard to generalize
>> it. For example, say you have a batch interval of 10 secswhat happens
>> if you get more than one updates on the same row within 10 secs? You will
>> get a snapshot of every 10 secs. Now, different databases provide different
>> mechanisms to expose all DML changes, MySQL has binlogs, oracle has log
>> shipping, cdc,golden gate and so ontypically it requires new product or
>> new licenses and most likely new component installation on production db :)
>>
>> So, if we keep real CDC solutions out of scope, a simple snapshot
>> solution can be achieved fairly easily by
>>
>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>> 3. Running an extraction/load mechanism which will take data from DB
>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
>> ETL tools would too...
>> 4. Finally, update check point...
>>
>> You may "determine" checkpoint from the data you already have in HDFS if
>> you create a Hive structure on it.
>>
>> Best
>> AYan
>>
>>
>>
>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪  wrote:
>>
>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>> table is append-only), send the log through kafka and then consume it by
>>> spark streaming?
>>>
>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 We don't support this yet, but I've opened this JIRA as it sounds
 generally useful: https://issues.apache.org/jira/browse/SPARK-19031

 In the mean time you could try implementing your own Source, but that
 is pretty low level and is not yet a stable API.

 On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
 yyz1...@gmail.com> wrote:

> Hi all,
>
> Thanks a lot for your contributions to bring us new technologies.
>
> I don't want to waste your time, so before I write to you, I googled,
> checked stackoverflow and mailing list archive with keywords "streaming"
> and "jdbc". But I was not able to get any solution to my use case. I hope 
> I
> can get some clarification from you.
>
> The use case is quite straightforward, I need to harvest a relational
> database via jdbc, do something with data, and store result into Kafka. I
> am stuck at the first step, and the difficulty is as follows:
>
> 1. The database is too large to ingest with one thread.
> 2. The database is dynamic and time series data comes in constantly.
>
> Then an ideal workflow is that multiple workers process partitions of
> data incrementally according to a time window. For example, the processing
> starts from the earliest data 

Re: Broadcast Join and Inner Join giving different result on same DataFrame

2017-01-03 Thread Patrick
Hi,

An Update on above question: In Local[*] mode code is working fine. The
Broadcast size is 200MB, but on Yarn it the broadcast join is giving empty
result.But in Sql Query in UI, it does show BroadcastHint.

Thanks


On Fri, Dec 30, 2016 at 9:15 PM, titli batali  wrote:

> Hi,
>
> I have two dataframes which has common column Product_Id on which i have
> to perform a join operation.
>
> val transactionDF = readCSVToDataFrame(sqlCtx: SQLContext,
> pathToReadTransactions: String, transactionSchema: StructType)
> val productDF = readCSVToDataFrame(sqlCtx: SQLContext,
> pathToReadProduct:String, productSchema: StructType)
>
> As, transaction data is very large but product data is small, i would
> ideally do a  broadcast join where i braodcast productDF.
>
>  val productBroadcastDF =  broadcast(productDF)
>  val broadcastJoin = transcationDF.join(productBroadcastDF,
> "productId")
>
> Or simply,  val innerJoin = transcationDF.join(productDF, "productId")
> should give the same result as above.
>
> But If i join using simple inner join i get  dataframe  with joined values
> whereas if i do broadcast join i get empty dataframe with empty values. I
> am not able to explain this behavior. Ideally both should give the same
> result.
>
> What could have gone wrong. Any one faced the similar issue?
>
>
> Thanks,
> Prateek
>
>
>
>
>


top-k function for Window

2017-01-03 Thread Andy Dang
Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with
skewed keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the
data locally, get the local rank first. What it ends up doing is performing
a sort by key using the skewed keys, and this blew up the cluster since the
keys are heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API,
especially with Window function. I guess some UserAggregateFunction would
do, but I wonder if there's obvious way that I missed.

---
Regards,
Andy


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Tamas,

Thanks a lot for your suggestion! I will also investigate this one later.

Best regards,
Yang

2017-01-03 12:38 GMT+01:00 Tamas Szuromi :

>
> You can also try https://github.com/zendesk/maxwell
>
> Tamas
>
> On 3 January 2017 at 12:25, Amrit Jangid  wrote:
>
>> You can try out *debezium* : https://github.com/debezium. it reads data
>> from bin-logs, provides structure and stream into Kafka.
>>
>> Now Kafka can be your new source for streaming.
>>
>> On Tue, Jan 3, 2017 at 4:36 PM, Yuanzhe Yang  wrote:
>>
>>> Hi Hongdi,
>>>
>>> Thanks a lot for your suggestion. The data is truely immutable and the
>>> table is append-only. But actually there are different databases involved,
>>> so the only feature they share in common and I can depend on is jdbc...
>>>
>>> Best regards,
>>> Yang
>>>
>>>
>>> 2016-12-30 6:45 GMT+01:00 任弘迪 :
>>>
 why not sync binlog of mysql(hopefully the data is immutable and the
 table is append-only), send the log through kafka and then consume it by
 spark streaming?

 On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
 mich...@databricks.com> wrote:

> We don't support this yet, but I've opened this JIRA as it sounds
> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>
> In the mean time you could try implementing your own Source, but that
> is pretty low level and is not yet a stable API.
>
> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
> yyz1...@gmail.com> wrote:
>
>> Hi all,
>>
>> Thanks a lot for your contributions to bring us new technologies.
>>
>> I don't want to waste your time, so before I write to you, I googled,
>> checked stackoverflow and mailing list archive with keywords "streaming"
>> and "jdbc". But I was not able to get any solution to my use case. I 
>> hope I
>> can get some clarification from you.
>>
>> The use case is quite straightforward, I need to harvest a relational
>> database via jdbc, do something with data, and store result into Kafka. I
>> am stuck at the first step, and the difficulty is as follows:
>>
>> 1. The database is too large to ingest with one thread.
>> 2. The database is dynamic and time series data comes in constantly.
>>
>> Then an ideal workflow is that multiple workers process partitions of
>> data incrementally according to a time window. For example, the 
>> processing
>> starts from the earliest data with each batch containing data for one 
>> hour.
>> If data ingestion speed is faster than data production speed, then
>> eventually the entire database will be harvested and those workers will
>> start to "tail" the database for new data streams and the processing
>> becomes real time.
>>
>> With Spark SQL I can ingest data from a JDBC source with partitions
>> divided by time windows, but how can I dynamically increment the time
>> windows during execution? Assume that there are two workers ingesting 
>> data
>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next 
>> task
>> for 2017-01-03. But I am not able to find out how to increment those 
>> values
>> during execution.
>>
>> Then I looked into Structured Streaming. It looks much more promising
>> because window operations based on event time are considered during
>> streaming, which could be the solution to my use case. However, from
>> documentation and code example I did not find anything related to 
>> streaming
>> data from a growing database. Is there anything I can read to achieve my
>> goal?
>>
>> Any suggestion is highly appreciated. Thank you very much and have a
>> nice day.
>>
>> Best regards,
>> Yang
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>

>>>
>>
>>
>> --
>>
>> Regards,
>> Amrit
>> Data Team
>>
>
>


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Amrit,

Thanks a lot for your suggestion! I will investigate it later.

Best regards,
Yang

2017-01-03 12:25 GMT+01:00 Amrit Jangid :

> You can try out *debezium* : https://github.com/debezium. it reads data
> from bin-logs, provides structure and stream into Kafka.
>
> Now Kafka can be your new source for streaming.
>
> On Tue, Jan 3, 2017 at 4:36 PM, Yuanzhe Yang  wrote:
>
>> Hi Hongdi,
>>
>> Thanks a lot for your suggestion. The data is truely immutable and the
>> table is append-only. But actually there are different databases involved,
>> so the only feature they share in common and I can depend on is jdbc...
>>
>> Best regards,
>> Yang
>>
>>
>> 2016-12-30 6:45 GMT+01:00 任弘迪 :
>>
>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>> table is append-only), send the log through kafka and then consume it by
>>> spark streaming?
>>>
>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 We don't support this yet, but I've opened this JIRA as it sounds
 generally useful: https://issues.apache.org/jira/browse/SPARK-19031

 In the mean time you could try implementing your own Source, but that
 is pretty low level and is not yet a stable API.

 On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
 yyz1...@gmail.com> wrote:

> Hi all,
>
> Thanks a lot for your contributions to bring us new technologies.
>
> I don't want to waste your time, so before I write to you, I googled,
> checked stackoverflow and mailing list archive with keywords "streaming"
> and "jdbc". But I was not able to get any solution to my use case. I hope 
> I
> can get some clarification from you.
>
> The use case is quite straightforward, I need to harvest a relational
> database via jdbc, do something with data, and store result into Kafka. I
> am stuck at the first step, and the difficulty is as follows:
>
> 1. The database is too large to ingest with one thread.
> 2. The database is dynamic and time series data comes in constantly.
>
> Then an ideal workflow is that multiple workers process partitions of
> data incrementally according to a time window. For example, the processing
> starts from the earliest data with each batch containing data for one 
> hour.
> If data ingestion speed is faster than data production speed, then
> eventually the entire database will be harvested and those workers will
> start to "tail" the database for new data streams and the processing
> becomes real time.
>
> With Spark SQL I can ingest data from a JDBC source with partitions
> divided by time windows, but how can I dynamically increment the time
> windows during execution? Assume that there are two workers ingesting data
> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next 
> task
> for 2017-01-03. But I am not able to find out how to increment those 
> values
> during execution.
>
> Then I looked into Structured Streaming. It looks much more promising
> because window operations based on event time are considered during
> streaming, which could be the solution to my use case. However, from
> documentation and code example I did not find anything related to 
> streaming
> data from a growing database. Is there anything I can read to achieve my
> goal?
>
> Any suggestion is highly appreciated. Thank you very much and have a
> nice day.
>
> Best regards,
> Yang
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

>>>
>>
>
>
> --
>
> Regards,
> Amrit
> Data Team
>


Re: How to load a big csv to dataframe in Spark 1.6

2017-01-03 Thread Steve Loughran

On 31 Dec 2016, at 16:09, Raymond Xie 
> wrote:

Hello Felix,

I followed the instruction and ran the command:

> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0

and I received the following error message:
java.lang.RuntimeException: java.net.ConnectException: Call From 
xie1/192.168.112.150 to localhost:9000 failed on 
connection exception: java.net.ConnectException: Connection refused; For more 
details see:  http://wiki.apache.org/hadoop/ConnectionRefused



Did you look at the wiki page? If not, why not?



Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Ayan,

Thanks a lot for such a detailed response. I really appreciate it!

I think this use case can be generalized, because the data is immutable and
append-only. We only need to find one column or timestamp to track the last
row consumed in the previous ingestion. This pattern should be common when
storing sensor data. If the data is mutable, then the solution will be
surely difficult and vendor specific as you said.

The workflow you proposed is very useful. The difficulty part is how to
parallelize the ingestion task. With Spark when I have multiple workers
working on the same job, I don't know if there is a way and how to
dynamically change the row range each worker should process in realtime...

I tried to find out if there is any candidate available out of the box,
instead of reinventing the wheel. At this moment I have not discovered any
existing tool can parallelize ingestion tasks on one database. Is Sqoop a
proper candidate from your knowledge?

Thank you again and have a nice day.

Best regards,
Yang



2016-12-30 8:28 GMT+01:00 ayan guha :

>
>
> "If data ingestion speed is faster than data production speed, then
> eventually the entire database will be harvested and those workers will
> start to "tail" the database for new data streams and the processing
> becomes real time."
>
> This part is really database dependent. So it will be hard to generalize
> it. For example, say you have a batch interval of 10 secswhat happens
> if you get more than one updates on the same row within 10 secs? You will
> get a snapshot of every 10 secs. Now, different databases provide different
> mechanisms to expose all DML changes, MySQL has binlogs, oracle has log
> shipping, cdc,golden gate and so ontypically it requires new product or
> new licenses and most likely new component installation on production db :)
>
> So, if we keep real CDC solutions out of scope, a simple snapshot solution
> can be achieved fairly easily by
>
> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
> 3. Running an extraction/load mechanism which will take data from DB
> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
> ETL tools would too...
> 4. Finally, update check point...
>
> You may "determine" checkpoint from the data you already have in HDFS if
> you create a Hive structure on it.
>
> Best
> AYan
>
>
>
> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪  wrote:
>
>> why not sync binlog of mysql(hopefully the data is immutable and the
>> table is append-only), send the log through kafka and then consume it by
>> spark streaming?
>>
>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust > > wrote:
>>
>>> We don't support this yet, but I've opened this JIRA as it sounds
>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>
>>> In the mean time you could try implementing your own Source, but that is
>>> pretty low level and is not yet a stable API.
>>>
>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" >> > wrote:
>>>
 Hi all,

 Thanks a lot for your contributions to bring us new technologies.

 I don't want to waste your time, so before I write to you, I googled,
 checked stackoverflow and mailing list archive with keywords "streaming"
 and "jdbc". But I was not able to get any solution to my use case. I hope I
 can get some clarification from you.

 The use case is quite straightforward, I need to harvest a relational
 database via jdbc, do something with data, and store result into Kafka. I
 am stuck at the first step, and the difficulty is as follows:

 1. The database is too large to ingest with one thread.
 2. The database is dynamic and time series data comes in constantly.

 Then an ideal workflow is that multiple workers process partitions of
 data incrementally according to a time window. For example, the processing
 starts from the earliest data with each batch containing data for one hour.
 If data ingestion speed is faster than data production speed, then
 eventually the entire database will be harvested and those workers will
 start to "tail" the database for new data streams and the processing
 becomes real time.

 With Spark SQL I can ingest data from a JDBC source with partitions
 divided by time windows, but how can I dynamically increment the time
 windows during execution? Assume that there are two workers ingesting data
 of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
 for 2017-01-03. But I am not able to find out how to increment those values
 during execution.

 Then I looked 

Re: Question about Spark and filesystems

2017-01-03 Thread Steve Loughran

On 18 Dec 2016, at 19:50, joa...@verona.se wrote:

Since each Spark worker node needs to access the same files, we have
tried using Hdfs. This worked, but there were some oddities making me a
bit uneasy. For dependency hell reasons I compiled a modified Spark, and
this version exhibited the odd behaviour with Hdfs. The problem might
have nothing to do with Hdfs, but the situation made me curious about
the alternatives.

what were the oddities?


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Tamas Szuromi
You can also try https://github.com/zendesk/maxwell

Tamas

On 3 January 2017 at 12:25, Amrit Jangid  wrote:

> You can try out *debezium* : https://github.com/debezium. it reads data
> from bin-logs, provides structure and stream into Kafka.
>
> Now Kafka can be your new source for streaming.
>
> On Tue, Jan 3, 2017 at 4:36 PM, Yuanzhe Yang  wrote:
>
>> Hi Hongdi,
>>
>> Thanks a lot for your suggestion. The data is truely immutable and the
>> table is append-only. But actually there are different databases involved,
>> so the only feature they share in common and I can depend on is jdbc...
>>
>> Best regards,
>> Yang
>>
>>
>> 2016-12-30 6:45 GMT+01:00 任弘迪 :
>>
>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>> table is append-only), send the log through kafka and then consume it by
>>> spark streaming?
>>>
>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 We don't support this yet, but I've opened this JIRA as it sounds
 generally useful: https://issues.apache.org/jira/browse/SPARK-19031

 In the mean time you could try implementing your own Source, but that
 is pretty low level and is not yet a stable API.

 On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
 yyz1...@gmail.com> wrote:

> Hi all,
>
> Thanks a lot for your contributions to bring us new technologies.
>
> I don't want to waste your time, so before I write to you, I googled,
> checked stackoverflow and mailing list archive with keywords "streaming"
> and "jdbc". But I was not able to get any solution to my use case. I hope 
> I
> can get some clarification from you.
>
> The use case is quite straightforward, I need to harvest a relational
> database via jdbc, do something with data, and store result into Kafka. I
> am stuck at the first step, and the difficulty is as follows:
>
> 1. The database is too large to ingest with one thread.
> 2. The database is dynamic and time series data comes in constantly.
>
> Then an ideal workflow is that multiple workers process partitions of
> data incrementally according to a time window. For example, the processing
> starts from the earliest data with each batch containing data for one 
> hour.
> If data ingestion speed is faster than data production speed, then
> eventually the entire database will be harvested and those workers will
> start to "tail" the database for new data streams and the processing
> becomes real time.
>
> With Spark SQL I can ingest data from a JDBC source with partitions
> divided by time windows, but how can I dynamically increment the time
> windows during execution? Assume that there are two workers ingesting data
> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next 
> task
> for 2017-01-03. But I am not able to find out how to increment those 
> values
> during execution.
>
> Then I looked into Structured Streaming. It looks much more promising
> because window operations based on event time are considered during
> streaming, which could be the solution to my use case. However, from
> documentation and code example I did not find anything related to 
> streaming
> data from a growing database. Is there anything I can read to achieve my
> goal?
>
> Any suggestion is highly appreciated. Thank you very much and have a
> nice day.
>
> Best regards,
> Yang
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

>>>
>>
>
>
> --
>
> Regards,
> Amrit
> Data Team
>


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Amrit Jangid
You can try out *debezium* : https://github.com/debezium. it reads data
from bin-logs, provides structure and stream into Kafka.

Now Kafka can be your new source for streaming.

On Tue, Jan 3, 2017 at 4:36 PM, Yuanzhe Yang  wrote:

> Hi Hongdi,
>
> Thanks a lot for your suggestion. The data is truely immutable and the
> table is append-only. But actually there are different databases involved,
> so the only feature they share in common and I can depend on is jdbc...
>
> Best regards,
> Yang
>
>
> 2016-12-30 6:45 GMT+01:00 任弘迪 :
>
>> why not sync binlog of mysql(hopefully the data is immutable and the
>> table is append-only), send the log through kafka and then consume it by
>> spark streaming?
>>
>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust > > wrote:
>>
>>> We don't support this yet, but I've opened this JIRA as it sounds
>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>
>>> In the mean time you could try implementing your own Source, but that is
>>> pretty low level and is not yet a stable API.
>>>
>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" >> > wrote:
>>>
 Hi all,

 Thanks a lot for your contributions to bring us new technologies.

 I don't want to waste your time, so before I write to you, I googled,
 checked stackoverflow and mailing list archive with keywords "streaming"
 and "jdbc". But I was not able to get any solution to my use case. I hope I
 can get some clarification from you.

 The use case is quite straightforward, I need to harvest a relational
 database via jdbc, do something with data, and store result into Kafka. I
 am stuck at the first step, and the difficulty is as follows:

 1. The database is too large to ingest with one thread.
 2. The database is dynamic and time series data comes in constantly.

 Then an ideal workflow is that multiple workers process partitions of
 data incrementally according to a time window. For example, the processing
 starts from the earliest data with each batch containing data for one hour.
 If data ingestion speed is faster than data production speed, then
 eventually the entire database will be harvested and those workers will
 start to "tail" the database for new data streams and the processing
 becomes real time.

 With Spark SQL I can ingest data from a JDBC source with partitions
 divided by time windows, but how can I dynamically increment the time
 windows during execution? Assume that there are two workers ingesting data
 of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
 for 2017-01-03. But I am not able to find out how to increment those values
 during execution.

 Then I looked into Structured Streaming. It looks much more promising
 because window operations based on event time are considered during
 streaming, which could be the solution to my use case. However, from
 documentation and code example I did not find anything related to streaming
 data from a growing database. Is there anything I can read to achieve my
 goal?

 Any suggestion is highly appreciated. Thank you very much and have a
 nice day.

 Best regards,
 Yang
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>


-- 

Regards,
Amrit
Data Team


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Hongdi,

Thanks a lot for your suggestion. The data is truely immutable and the
table is append-only. But actually there are different databases involved,
so the only feature they share in common and I can depend on is jdbc...

Best regards,
Yang


2016-12-30 6:45 GMT+01:00 任弘迪 :

> why not sync binlog of mysql(hopefully the data is immutable and the table
> is append-only), send the log through kafka and then consume it by spark
> streaming?
>
> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust 
> wrote:
>
>> We don't support this yet, but I've opened this JIRA as it sounds
>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>
>> In the mean time you could try implementing your own Source, but that is
>> pretty low level and is not yet a stable API.
>>
>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" 
>> wrote:
>>
>>> Hi all,
>>>
>>> Thanks a lot for your contributions to bring us new technologies.
>>>
>>> I don't want to waste your time, so before I write to you, I googled,
>>> checked stackoverflow and mailing list archive with keywords "streaming"
>>> and "jdbc". But I was not able to get any solution to my use case. I hope I
>>> can get some clarification from you.
>>>
>>> The use case is quite straightforward, I need to harvest a relational
>>> database via jdbc, do something with data, and store result into Kafka. I
>>> am stuck at the first step, and the difficulty is as follows:
>>>
>>> 1. The database is too large to ingest with one thread.
>>> 2. The database is dynamic and time series data comes in constantly.
>>>
>>> Then an ideal workflow is that multiple workers process partitions of
>>> data incrementally according to a time window. For example, the processing
>>> starts from the earliest data with each batch containing data for one hour.
>>> If data ingestion speed is faster than data production speed, then
>>> eventually the entire database will be harvested and those workers will
>>> start to "tail" the database for new data streams and the processing
>>> becomes real time.
>>>
>>> With Spark SQL I can ingest data from a JDBC source with partitions
>>> divided by time windows, but how can I dynamically increment the time
>>> windows during execution? Assume that there are two workers ingesting data
>>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
>>> for 2017-01-03. But I am not able to find out how to increment those values
>>> during execution.
>>>
>>> Then I looked into Structured Streaming. It looks much more promising
>>> because window operations based on event time are considered during
>>> streaming, which could be the solution to my use case. However, from
>>> documentation and code example I did not find anything related to streaming
>>> data from a growing database. Is there anything I can read to achieve my
>>> goal?
>>>
>>> Any suggestion is highly appreciated. Thank you very much and have a
>>> nice day.
>>>
>>> Best regards,
>>> Yang
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Michael,

Thanks a lot for your ticket. At least it is the first step.

Best regards,
Yang

2016-12-30 2:01 GMT+01:00 Michael Armbrust :

> We don't support this yet, but I've opened this JIRA as it sounds
> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>
> In the mean time you could try implementing your own Source, but that is
> pretty low level and is not yet a stable API.
>
> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" 
> wrote:
>
>> Hi all,
>>
>> Thanks a lot for your contributions to bring us new technologies.
>>
>> I don't want to waste your time, so before I write to you, I googled,
>> checked stackoverflow and mailing list archive with keywords "streaming"
>> and "jdbc". But I was not able to get any solution to my use case. I hope I
>> can get some clarification from you.
>>
>> The use case is quite straightforward, I need to harvest a relational
>> database via jdbc, do something with data, and store result into Kafka. I
>> am stuck at the first step, and the difficulty is as follows:
>>
>> 1. The database is too large to ingest with one thread.
>> 2. The database is dynamic and time series data comes in constantly.
>>
>> Then an ideal workflow is that multiple workers process partitions of
>> data incrementally according to a time window. For example, the processing
>> starts from the earliest data with each batch containing data for one hour.
>> If data ingestion speed is faster than data production speed, then
>> eventually the entire database will be harvested and those workers will
>> start to "tail" the database for new data streams and the processing
>> becomes real time.
>>
>> With Spark SQL I can ingest data from a JDBC source with partitions
>> divided by time windows, but how can I dynamically increment the time
>> windows during execution? Assume that there are two workers ingesting data
>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
>> for 2017-01-03. But I am not able to find out how to increment those values
>> during execution.
>>
>> Then I looked into Structured Streaming. It looks much more promising
>> because window operations based on event time are considered during
>> streaming, which could be the solution to my use case. However, from
>> documentation and code example I did not find anything related to streaming
>> data from a growing database. Is there anything I can read to achieve my
>> goal?
>>
>> Any suggestion is highly appreciated. Thank you very much and have a nice
>> day.
>>
>> Best regards,
>> Yang
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


[Spark Kafka] How to update batch size of input dynamically for spark kafka consumer?

2017-01-03 Thread 周家帅
Hi,

I am an intermediate spark user and have some experience in large data
processing. I post this question in StackOverflow but receive no response.
My problem is as follows:

I use createDirectStream in my spark streaming application. I set the batch
interval to 7 seconds and most of the time the batch job can finish within
about 5 seconds. However, in very rare cases, the batch job need cost 60
seconds and this will delay some batches of jobs. To cut down the total
delay time for these batches, I hope I can process more streaming data
which spread over the delayed jobs at one time. This will help the
streaming return to normal as soon as possible.

So, I want to know there is some method to dynamically update/merge batch
size of input for spark and kafka when delay appears.

Many thanks for your help.

-- 
Jiashuai Zhou

School of Electronics Engineering and Computer Science,
Peking University


Re: Re: Re: Spark Streaming prediction

2017-01-03 Thread Marco Mistroni
Hi
 ok then my suggestion stays.Check out ML
you can train your ML model on past data (let's say, either yesteday or
past x days) to have Spark find out what is the relation betwen the value
you have at T-zero and the value you have at T+n hours and you can try ml
outside your. Streaming app by gathering data for x days , feed it to your
model and see results
Hth

On Mon, Jan 2, 2017 at 9:51 PM, Daniela S  wrote:

> Dear Marco
>
> No problem, thank you very much for your help!
> Yes, that is correct. I always know the minute values for the next e.g.
> 180 minutes (may vary between the different devices) and I want to predict
> the values for the next 24 hours (one value per minute). So as long as
> I know the values (e.g. 180 minutes) I would of course like to use these
> values and the missing ones to get values for the next 24 hours (one value
> per minute) should be predicted.
>
> Thank you in advance.
>
> Regards,
> Daniela
>
> *Gesendet:* Montag, 02. Januar 2017 um 22:30 Uhr
> *Von:* "Marco Mistroni" 
> *An:* "Daniela S" 
> *Cc:* User 
> *Betreff:* Re: Re: Spark Streaming prediction
> Apologies, perhaps i misunderstood your usecase.
> My assumption was that you have 2-3 hours worth fo data and you want to
> know the values for the next 24 based on the values you already have, that
> is why i suggested  the ML path.
> If that is not the case please ignore everything i said..
>
> so, let's take the simple case where you have only 1 device
> So every event contains the minute value of that device for the next 180
> mins. So at any point in time you only  have visibility of the next 180
> minutes, correct?
> Now do you want to predict what the value will be for the next 24 hrs, or
> do you  just want to accumulate data worth of 24 hrs and display it in the
> dashboard?
> or is it something else?
>
> for dashboard update, i guess you either
> - poll 'a  database' (where you store the compuation of your spark logic )
> periodically
> - propagate events from your spark streaming application to your dashboard
> somewhere (via actors/ JMS or whatever mechanism)
>
> kr
>  marco
>
>
>
>
>
>
>
>
>
> On Mon, Jan 2, 2017 at 8:26 PM, Daniela S  wrote:
>>
>> Hi
>>
>> Thank you very much for your answer!
>>
>> My problem is that I know the values for the next 2-3 hours in advance
>> but i do not know the values from hour 2 or 3 to hour 24. How is it
>> possible to combine the known values with the predicted values as both are
>> values in the future? And how can i ensure that there are always 1440
>> values?
>> And I do not know how to map the values for 1440 minutes to a specific
>> time on the dashboard (e.g. how does the dashboard know that the value for
>> minute 300 maps to time 15:05?
>>
>> Thank you in advance.
>>
>> Best regards,
>> Daniela
>>
>>
>>
>> *Gesendet:* Montag, 02. Januar 2017 um 21:07 Uhr
>> *Von:* "Marco Mistroni" 
>> *An:* "Daniela S" 
>> *Cc:* User 
>> *Betreff:* Re: Spark Streaming prediction
>> Hi
>>  you  might want to have a look at the Regression ML  algorithm and
>> integrate it in your SparkStreaming application, i m sure someone on the
>> list has  a similar use case
>> shortly, you'd want to process all your events and feed it through a ML
>> model which,based on your inputs will predict output
>> You say that your events predict minutes values for next 2-3 hrs...
>> gather data for a day and train ur model based on that. Then save it
>> somewhere and have your streaming app load the module and have the module
>> do the predictions based on incoming events from your streaming app.
>> Save the results somewhere and have your dashboard poll periodically your
>> data store to read the predictions
>> I have seen ppl on the list doing ML over a Spark streaming app, i m sure
>> someone can reply back
>> Hpefully i gave u a starting point
>>
>> hth
>>  marco
>>
>> On 2 Jan 2017 4:03 pm, "Daniela S"  wrote:
>>>
>>> Hi
>>>
>>> I am trying to solve the following problem with Spark Streaming.
>>> I receive timestamped events from Kafka. Each event refers to a device
>>> and contains values for every minute of the next 2 to 3 hours. What I would
>>> like to do is to predict the minute values for the next 24 hours. So I
>>> would like to use the known values and to predict the other values to
>>> achieve the 24 hours prediction. My thought was to use arrays with a length
>>> of 1440 (1440 minutes = 24 hours). One for the known values and one for the
>>> predicted values for each device. Then I would like to show the next 24
>>> hours on a dashboard. The dashboard should be updated automatically in
>>> realtime.
>>>
>>> My questions:
>>> is this a possible solution?
>>> how is it possible to combine known future values and predicted values?
>>> how should I treat the timestamp as the length of 1440