Processing json document

2016-07-06 Thread Lan Jiang
Hi, there

Spark has provided json document processing feature for a long time. In
most examples I see, each line is a json object in the sample file. That is
the easiest case. But how can we process a json document, which does not
conform to this standard format (one line per json object)? Here is the
document I am working on.

First of all, it is multiple lines for one single big json object. The real
file can be as long as 20+ G. Within that one single json object, it
contains many name/value pairs. The name is some kind of id values. The
value is the actual json object that I would like to be part of dataframe.
Is there any way to do that? Appreciate any input.


{
"id1": {
"Title":"title1",
"Author":"Tom",
"Source":{
"Date":"20160506",
"Type":"URL"
},
"Data":" blah blah"},

"id2": {
"Title":"title2",
"Author":"John",
"Source":{
"Date":"20150923",
"Type":"URL"
},
"Data":" blah blah "},

"id3: {
"Title":"title3",
"Author":"John",
"Source":{
"Date":"20150902",
"Type":"URL"
},
"Data":" blah blah "}
}


Re: Is that possible to launch spark streaming application on yarn with only one machine?

2016-07-06 Thread Rabin Banerjee
In yarn cluster mode , Driver is running in AM , so you can find the logs
in that AM log . Open rersourcemanager UI , and check for the Job and logs.
or yarn logs -applicationId 

In yarn client mode , the driver is the same JVM from where you are
launching ,,So you are getting it in the log .

On Thu, Jul 7, 2016 at 7:56 AM, Yu Wei  wrote:

> Launching via client deploy mode, it works again.
>
> I'm still a little confused about the behavior difference for cluster and
> client mode on a single machine.
>
>
> Thanks,
>
> Jared
> --
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, July 6, 2016 9:46:11 PM
> *To:* Yu Wei
> *Cc:* Deng Ching-Mallete; user@spark.apache.org
>
> *Subject:* Re: Is that possible to launch spark streaming application on
> yarn with only one machine?
>
> Deploy-mode cluster don't think will work.
>
> Try --master yarn --deploy-mode client
>
> FYI
>
>
>-
>
>*Spark Local* - Spark runs on the local host. This is the simplest set
>up and best suited for learners who want to understand different concepts
>of Spark and those performing unit testing.
>-
>
>*Spark Standalone *– a simple cluster manager included with Spark that
>makes it easy to set up a cluster.
>-
>
>*YARN Cluster Mode,* the Spark driver runs inside an application
>master process which is managed by YARN on the cluster, and the client can
>go away after initiating the application. This is invoked with –master
>yarn and --deploy-mode cluster
>-
>
>*YARN Client Mode*, the driver runs in the client process, and the
>application master is only used for requesting resources from YARN. Unlike 
> Spark
>standalone mode, in which the master’s address is specified in the
>--master parameter, in YARN mode the ResourceManager’s address is
>picked up from the Hadoop configuration. Thus, the --master parameter
>is yarn. This is invoked with --deploy-mode client
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 July 2016 at 12:31, Yu Wei  wrote:
>
>> Hi Deng,
>>
>> I tried the same code again.
>>
>> It seemed that when launching application via yarn on single node,
>> JavaDStream.print() did not work. However, occasionally it worked.
>>
>> If launch the same application in local mode, it always worked.
>>
>>
>> The code is as below,
>>
>> SparkConf conf = new SparkConf().setAppName("Monitor");
>> JavaStreamingContext jssc = new JavaStreamingContext(conf,
>> Durations.seconds(1));
>> JavaReceiverInputDStream inputDS =
>> MQTTUtils.createStream(jssc, "tcp://114.55.145.185:1883", "Control");
>> inputDS.print();
>> jssc.start();
>> jssc.awaitTermination();
>>
>>
>> Command for launching via yarn, (did not work)
>>
>> spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
>> --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
>>  Command for launching via local mode (works)
>>spark-submit --master local[4] --driver-memory 4g --executor-memory 2g
>> --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar
>>
>>
>>
>> Any advice?
>>
>>
>> Thanks,
>>
>> Jared
>>
>>
>>
>> --
>> *From:* Yu Wei 
>> *Sent:* Tuesday, July 5, 2016 4:41 PM
>> *To:* Deng Ching-Mallete
>>
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Is that possible to launch spark streaming application on
>> yarn with only one machine?
>>
>>
>> Hi Deng,
>>
>>
>> Thanks for the help. Actually I need pay more attention to memory usage.
>>
>> I found the root cause in my problem. It seemed that it existed in spark
>> streaming MQTTUtils module.
>>
>> When I use "localhost" in brokerURL, it doesn't work.
>>
>> After change it to "127.0.0.1", it works now.
>>
>>
>> Thanks again,
>>
>> Jared
>>
>>
>>
>> --
>> *From:* odeach...@gmail.com  on behalf of Deng
>> Ching-Mallete 
>> *Sent:* Tuesday, July 5, 2016 4:03:28 PM
>> *To:* Yu Wei
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Is that possible to launch spark streaming application on
>> yarn with only one machine?
>>
>> Hi Jared,
>>
>> You can launch a Spark application even with just a single node in YARN,
>> provided that the node has enough resources to run the job.
>>
>> It might also be good to note that when YARN calculates the 

Re: Question regarding structured data and partitions

2016-07-06 Thread Koert Kuipers
spark does keep some information on the partitions of an RDD, namely the
partitioning/partitioner.

GroupSorted is an extension for key-value RDDs that also keeps track of the
ordering, allowing for faster joins, non-reduce type operations on very
large groups of values per key, etc.
see here:
https://github.com/tresata/spark-sorted
however no support for streaming (yet)...


On Wed, Jul 6, 2016 at 11:55 PM, Omid Alipourfard  wrote:

> Hi,
>
> Why doesn't Spark keep information about the structure of the RDDs or the
> partitions within RDDs?   Say that I use
> repartitionAndSortWithinPartitions, which results in sorted partitions.
> With sorted partitions, lookups should be super fast (binary search?), yet
> I still need to go through the whole partition to perform a lookup -- using
> say, filter.
>
> To give more context into a use case, let me give a very simple example
> where having this feature seems extremely useful: consider that you have a
> stream of incoming keys, where for each key you need to lookup the
> associated value in a large RDD and perform operations on the values.
> Right now, performing a join between the RDDs in the DStream and the large
> RDD seems to be the way to go.  I.e.:
>
> incomingData.transform { rdd => largeRdd.join(rdd) }
>   .map(performAdditionalOperations).save(...)
>
> Assuming that the largeRdd is sorted/or contains an index and each window
> of incomingData is small, this join operation can be performed in 
> *O(incomingData
> * (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is
> much more expensive than that.
>
> I have just started using Spark, so it's highly likely that I am using it
> wrong.  So any thoughts are appreciated!
>
> TL;DR.  Why not keep an index/info with each partition or RDD to speed up
> operations such as lookups filters, etc.?
>
> Thanks,
> Omid
>


Question regarding structured data and partitions

2016-07-06 Thread Omid Alipourfard
Hi,

Why doesn't Spark keep information about the structure of the RDDs or the
partitions within RDDs?   Say that I use repartitionAndSortWithinPartitions,
which results in sorted partitions.  With sorted partitions, lookups should
be super fast (binary search?), yet I still need to go through the whole
partition to perform a lookup -- using say, filter.

To give more context into a use case, let me give a very simple example
where having this feature seems extremely useful: consider that you have a
stream of incoming keys, where for each key you need to lookup the
associated value in a large RDD and perform operations on the values.
Right now, performing a join between the RDDs in the DStream and the large
RDD seems to be the way to go.  I.e.:

incomingData.transform { rdd => largeRdd.join(rdd) }
  .map(performAdditionalOperations).save(...)

Assuming that the largeRdd is sorted/or contains an index and each window
of incomingData is small, this join operation can be performed in
*O(incomingData
* (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is much
more expensive than that.

I have just started using Spark, so it's highly likely that I am using it
wrong.  So any thoughts are appreciated!

TL;DR.  Why not keep an index/info with each partition or RDD to speed up
operations such as lookups filters, etc.?

Thanks,
Omid


Structured Streaming Comparison to AMPS

2016-07-06 Thread craigjar
I have been doing several Spark PoC projects recently and the latest one
involved the new 2.0 experimental feature Structured Streaming.  My PoC
ended up being a non-starter as I quickly realized the stream to stream
joins are not implemented yet.  I believe this feature will be immensely
powerful and allow applications to migrate from batch to streaming with
ease, which is exactly what I would like to accomplish.

I came across an interesting article (link below) from another streaming
platform called AMPS which I believe describes a common use case where
Spark's Structured Streaming would be perfect for and I hope the Spark dev
team is looking at the AMPS platform for some inspiration.

http://www.crankuptheamps.com/blog/posts/2014/04/07/real-time-streaming-joins-reinvented/

I am posting this for two reason:
 - Share AMPS approach to this problem and get the communities thoughts on
Spark's Structured Streaming approach if it will provide similar semantics
 - Inquire about the availability of stream to stream joins and where I can
follow along (is there a JIRA or dev mailing list topic I can follow).

Thank you in advance for any replies.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-Comparison-to-AMPS-tp27303.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is that possible to launch spark streaming application on yarn with only one machine?

2016-07-06 Thread Yu Wei
Launching via client deploy mode, it works again.

I'm still a little confused about the behavior difference for cluster and 
client mode on a single machine.


Thanks,

Jared


From: Mich Talebzadeh 
Sent: Wednesday, July 6, 2016 9:46:11 PM
To: Yu Wei
Cc: Deng Ching-Mallete; user@spark.apache.org
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?

Deploy-mode cluster don't think will work.

Try --master yarn --deploy-mode client

FYI


  *   Spark Local - Spark runs on the local host. This is the simplest set up 
and best suited for learners who want to understand different concepts of Spark 
and those performing unit testing.

  *   Spark Standalone – a simple cluster manager included with Spark that 
makes it easy to set up a cluster.

  *   YARN Cluster Mode, the Spark driver runs inside an application master 
process which is managed by YARN on the cluster, and the client can go away 
after initiating the application. This is invoked with –master yarn and 
--deploy-mode cluster

  *   YARN Client Mode, the driver runs in the client process, and the 
application master is only used for requesting resources from YARN. Unlike 
Spark standalone mode, in which the master’s address is specified in the 
--master parameter, in YARN mode the ResourceManager’s address is picked up 
from the Hadoop configuration. Thus, the --master parameter is yarn. This is 
invoked with --deploy-mode client

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 6 July 2016 at 12:31, Yu Wei 
> wrote:

Hi Deng,

I tried the same code again.

It seemed that when launching application via yarn on single node, 
JavaDStream.print() did not work. However, occasionally it worked.

If launch the same application in local mode, it always worked.


The code is as below,

SparkConf conf = new SparkConf().setAppName("Monitor");
JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(1));
JavaReceiverInputDStream inputDS = MQTTUtils.createStream(jssc, 
"tcp://114.55.145.185:1883", "Control");
inputDS.print();
jssc.start();
jssc.awaitTermination();


Command for launching via yarn, (did not work)

spark-submit --master yarn --deploy-mode cluster --driver-memory 4g 
--executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
 Command for launching via local mode (works)
   spark-submit --master local[4] --driver-memory 4g --executor-memory 2g 
--num-executors 4 target/CollAna-1.0-SNAPSHOT.jar



Any advice?


Thanks,

Jared



From: Yu Wei >
Sent: Tuesday, July 5, 2016 4:41 PM
To: Deng Ching-Mallete

Cc: user@spark.apache.org
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?


Hi Deng,


Thanks for the help. Actually I need pay more attention to memory usage.

I found the root cause in my problem. It seemed that it existed in spark 
streaming MQTTUtils module.

When I use "localhost" in brokerURL, it doesn't work.

After change it to "127.0.0.1", it works now.


Thanks again,

Jared




From: odeach...@gmail.com 
> on behalf of Deng 
Ching-Mallete >
Sent: Tuesday, July 5, 2016 4:03:28 PM
To: Yu Wei
Cc: user@spark.apache.org
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?

Hi Jared,

You can launch a Spark application even with just a single node in YARN, 
provided that the node has enough resources to run the job.

It might also be good to note that when YARN calculates the memory allocation 
for the driver and the executors, there is an additional memory overhead that 
is added for each executor then it gets rounded up to the nearest GB, IIRC. So 
the 4G driver-memory + 4x2G executor memory do not necessarily translate to a 
total of 12G memory allocation. It would be more than that, so the node would 
need to have more than 12G of memory for the job to execute in YARN. You should 
be able to see something like "No resources available in cluster.." in the 
application master logs in YARN if that is the case.

HTH,
Deng

On Tue, Jul 5, 2016 at 4:31 PM, Yu Wei 

Re: How to spin up Kafka using docker and use for Spark Streaming Integration tests

2016-07-06 Thread swetha kasireddy
Can this docker image be used to spin up kafka cluster in a CI/CD pipeline
like Jenkins to run the integration tests? Or it can be done only in the
local machine that has docker installed? I assume that the box where the
CI/CD pipeline runs should have docker installed correct?

On Mon, Jul 4, 2016 at 5:20 AM, Lars Albertsson  wrote:

> I created such a setup for a client a few months ago. It is pretty
> straightforward, but it can take some work to get all the wires
> connected.
>
> I suggest that you start with the spotify/kafka
> (https://github.com/spotify/docker-kafka) Docker image, since it
> includes a bundled zookeeper. The alternative would be to spin up a
> separate Zookeeper Docker container and connect them, but for testing
> purposes, it would make the setup more complex.
>
> You'll need to inform Kafka about the external address it exposes by
> setting ADVERTISED_HOST to the output of "docker-machine ip" (on Mac)
> or the address printed by "ip addr show docker0" (Linux). I also
> suggest setting
> AUTO_CREATE_TOPICS to true.
>
> You can choose to run your Spark Streaming application under test
> (SUT) and your test harness also in Docker containers, or directly on
> your host.
>
> In the former case, it is easiest to set up a Docker Compose file
> linking the harness and SUT to Kafka. This variant provides better
> isolation, and might integrate better if you have existing similar
> test frameworks.
>
> If you want to run the harness and SUT outside Docker, I suggest that
> you build your harness with a standard test framework, e.g. scalatest
> or JUnit, and run both harness and SUT in the same JVM. In this case,
> you put code to bring up the Kafka Docker container in test framework
> setup methods. This test strategy integrates better with IDEs and
> build tools (mvn/sbt/gradle), since they will run (and debug) your
> tests without any special integration. I therefore prefer this
> strategy.
>
>
> What is the output of your application? If it is messages on a
> different Kafka topic, the test harness can merely subscribe and
> verify output. If you emit output to a database, you'll need another
> Docker container, integrated with Docker Compose. If you are emitting
> database entries, your test oracle will need to frequently poll the
> database for the expected records, with a timeout in order not to hang
> on failing tests.
>
> I hope this is comprehensible. Let me know if you have followup questions.
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
> Calendar: https://goo.gl/6FBtlS
>
>
>
> On Thu, Jun 30, 2016 at 8:19 PM, SRK  wrote:
> > Hi,
> >
> > I need to do integration tests using Spark Streaming. My idea is to spin
> up
> > kafka using docker locally and use it to feed the stream to my Streaming
> > Job. Any suggestions on how to do this would be of great help.
> >
> > Thanks,
> > Swetha
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-spin-up-Kafka-using-docker-and-use-for-Spark-Streaming-Integration-tests-tp27252.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Logs of spark driver in yarn-client mode.

2016-07-06 Thread Egor Pahomov
Hi, I have next issue:

I have zeppelin, which set up in yarn-client mode. Notebook in Running
state for long period of time with 0% done and I do not see any even
accepted application in yarn.

To be able to understand what's going on, I need logs of spark driver,
which is trying to connect to hadoop, but zeppelin/logs/* does not have
enough information. Where I should look for these logs?

-- 


*Sincerely yoursEgor Pakhomov*


Re: How to spin up Kafka using docker and use for Spark Streaming Integration tests

2016-07-06 Thread swetha kasireddy
The application output is that it inserts data to cassandra at the end of
every batch.

On Mon, Jul 4, 2016 at 5:20 AM, Lars Albertsson  wrote:

> I created such a setup for a client a few months ago. It is pretty
> straightforward, but it can take some work to get all the wires
> connected.
>
> I suggest that you start with the spotify/kafka
> (https://github.com/spotify/docker-kafka) Docker image, since it
> includes a bundled zookeeper. The alternative would be to spin up a
> separate Zookeeper Docker container and connect them, but for testing
> purposes, it would make the setup more complex.
>
> You'll need to inform Kafka about the external address it exposes by
> setting ADVERTISED_HOST to the output of "docker-machine ip" (on Mac)
> or the address printed by "ip addr show docker0" (Linux). I also
> suggest setting
> AUTO_CREATE_TOPICS to true.
>
> You can choose to run your Spark Streaming application under test
> (SUT) and your test harness also in Docker containers, or directly on
> your host.
>
> In the former case, it is easiest to set up a Docker Compose file
> linking the harness and SUT to Kafka. This variant provides better
> isolation, and might integrate better if you have existing similar
> test frameworks.
>
> If you want to run the harness and SUT outside Docker, I suggest that
> you build your harness with a standard test framework, e.g. scalatest
> or JUnit, and run both harness and SUT in the same JVM. In this case,
> you put code to bring up the Kafka Docker container in test framework
> setup methods. This test strategy integrates better with IDEs and
> build tools (mvn/sbt/gradle), since they will run (and debug) your
> tests without any special integration. I therefore prefer this
> strategy.
>
>
> What is the output of your application? If it is messages on a
> different Kafka topic, the test harness can merely subscribe and
> verify output. If you emit output to a database, you'll need another
> Docker container, integrated with Docker Compose. If you are emitting
> database entries, your test oracle will need to frequently poll the
> database for the expected records, with a timeout in order not to hang
> on failing tests.
>
> I hope this is comprehensible. Let me know if you have followup questions.
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
> Calendar: https://goo.gl/6FBtlS
>
>
>
> On Thu, Jun 30, 2016 at 8:19 PM, SRK  wrote:
> > Hi,
> >
> > I need to do integration tests using Spark Streaming. My idea is to spin
> up
> > kafka using docker locally and use it to feed the stream to my Streaming
> > Job. Any suggestions on how to do this would be of great help.
> >
> > Thanks,
> > Swetha
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-spin-up-Kafka-using-docker-and-use-for-Spark-Streaming-Integration-tests-tp27252.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Presentation in London: Running Spark on Hive or Hive on Spark

2016-07-06 Thread Mich Talebzadeh
Dear forum members

I will be presenting on the topic of "Running Spark on Hive or Hive on
Spark, your mileage varies" in Future of Data: London


*Details*

*Organized by: Hortonworks *

*Date: Wednesday, July 20, 2016, 6:00 PM to 8:30 PM *

*Place: London*

*Location: One Canada Square, Canary Wharf,  London E14 5AB.*

*Nearest Underground:  Canary Warf (map
)
*

If you are interested please register here


Looking forward to seeing those who can make it to have an interesting
discussion and leverage your experience.
Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Difference between DataFrame.write.jdbc and DataFrame.write.format("jdbc")

2016-07-06 Thread Xiao Li
Hi, Dragisa,

Just submitted a PR for implementing the save API.
https://github.com/apache/spark/pull/14077

Let me know if you have any question,

Xiao

2016-07-06 10:41 GMT-07:00 Rabin Banerjee :

> HI Buddy,
>
>I sued both but DataFrame.write.jdbc is old, and will work if provide
> table name , It wont work if you provide custom queries . Where
> as DataFrame.write.format is more generic as well as working perfectly with
> not only table name but also custom queries . Hence I recommend to use
> the DataFrame.write.format("jdbc") .
>
> Cheers !
> Rabin
>
>
>
> On Wed, Jul 6, 2016 at 10:35 PM, Dragisa Krsmanovic <
> dragi...@ticketfly.com> wrote:
>
>> I was expecting to get the same results with both:
>>
>> dataFrame.write.mode(SaveMode.Overwrite).jdbc(dbUrl, "my_table", props)
>>
>> and
>>
>> dataFrame.write.mode(SaveMode.Overwrite).format("jdbc").options(opts).option("dbtable",
>> "my_table")
>>
>>
>> In the first example, it behaves as expected. It creates a new table and
>> populates it with the rows from DataFrame.
>>
>> In the second case, I get exception:
>> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not
>> allow create table as select.
>>
>> Looking at the Spark source, it looks like there is a completely separate
>> implementation for format("jdbc") and for jdbc(...).
>>
>> I find that confusing. Unfortunately documentation is rather sparse and
>> one finds this discrepancy only through trial and error.
>>
>> Is there a plan to deprecate one of the forms ? Or to allow same
>> functionality for both ?
>>
>> I tried both 1.6 and 2.0-preview
>> --
>>
>> Dragiša Krsmanović | Platform Engineer | Ticketfly
>>
>> dragi...@ticketfly.com
>>
>> @ticketfly  | ticketfly.com/blog |
>> facebook.com/ticketfly
>>
>
>


Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Andreas Bauer
The job works just fine. DB2 also performs very well. But I'm supposed to 
investigate alternatives. Thanks for the advice regarding Apache Drill. I'll 
definitely have a look! Best regards,  Andreas  



Sorry,  
I was assuming that you wanted to build the data lake in Hadoop rather than 
just reading from DB2. (Data Lakes need to be built correctly. )  


So, slightly different answer.



Yes, you can do this…  


You will end up with an immutable copy of the data that you would read in 
serially. Then you will probably need to repartition the data, depending on 
size and how much parallelization you want.  And then run the batch processing. 
 


But I have to ask why?  
Are you having issues with DB2?  
Are your batch jobs interfering with your transactional work?  


You will have a hit up front as you read the data from DB2, but then, depending 
on how you use the data… you may be faster overall.  


Please don’t misunderstand, Spark is a viable solution, however… there’s a bit 
of heavy lifting that has to occur (e.g. building and maintaining a spark 
cluster) and there are alternatives out there that work.  


Performance of the DB2 tables will vary based on indexing, assuming you have 
the appropriate indexes in place.  


You could also look at Apache Drill too.  


HTH  
-Mike







> On Jul 6, 2016, at 3:24 PM, Andreas Bauer  wrote:

>  
> Thanks for the advice. I have to retrieve the basic data from the DB2 tables 
> but afterwards I'm pretty free to transform the data as needed.  
>  
>  
>  
> On 6. Juli 2016 um 22:12:26 MESZ, Michael Segel  
> wrote:

>> I think you need to learn the basics of how to build a ‘data 
>> lake/pond/sewer’ first.  
>>  
>> The short answer is yes.  
>> The longer answer is that you need to think more about translating a 
>> relational model in to a hierarchical model, something that I seriously 
>> doubt has been taught in schools in a very long time.  
>>  
>> Then there’s more to the design, including indexing.  
>> Do you want to stick with SQL or do you want to hand code the work to allow 
>> for indexing / secondary indexing to help with the filtering since Spark SQL 
>> doesn’t really handle indexing. Note that you could actually still use an 
>> index table (narrow/thin inverted table) and join against the base table to 
>> get better performance.  
>>  
>> There’s more to this, but you get the idea.

>>  
>> HTH

>>  
>> -Mike

>>  
>> > On Jul 6, 2016, at 2:25 PM, dabuki wrote:

>> >  
>> > I was thinking about to replace a legacy batch job with Spark, but I'm not

>> > sure if Spark is suited for this use case. Before I start the proof of

>> > concept, I wanted to ask for opinions.

>> >  
>> > The legacy job works as follows: A file (100k - 1 mio entries) is iterated.

>> > Every row contains a (book) order with an id and for each row approx. 15

>> > processing steps have to be performed that involve access to multiple

>> > database tables. In total approx. 25 tables (each containing 10k-700k

>> > entries) have to be scanned using the book's id and the retrieved data is

>> > joined together.  
>> >  
>> > As I'm new to Spark I'm not sure if I can leverage Spark's processing model

>> > for this use case.

>> >  
>> >  
>> >  
>> >  
>> >  
>> > --

>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html

>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.

>> >  
>> > -

>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org

>> >  
>> >  
>>  
>>  
>> -

>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

>>  




Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Michael Segel
Sorry, 
I was assuming that you wanted to build the data lake in Hadoop rather than 
just reading from DB2. (Data Lakes need to be built correctly. ) 

So, slightly different answer.

Yes, you can do this… 

You will end up with an immutable copy of the data that you would read in 
serially. Then you will probably need to repartition the data, depending on 
size and how much parallelization you want.  And then run the batch processing. 

But I have to ask why? 
Are you having issues with DB2? 
Are your batch jobs interfering with your transactional work? 

You will have a hit up front as you read the data from DB2, but then, depending 
on how you use the data… you may be faster overall. 

Please don’t misunderstand, Spark is a viable solution, however… there’s a bit 
of heavy lifting that has to occur (e.g. building and maintaining a spark 
cluster) and there are alternatives out there that work. 

Performance of the DB2 tables will vary based on indexing, assuming you have 
the appropriate indexes in place. 

You could also look at Apache Drill too. 

HTH 
-Mike



> On Jul 6, 2016, at 3:24 PM, Andreas Bauer  wrote:
> 
> Thanks for the advice. I have to retrieve the basic data from the DB2 tables 
> but afterwards I'm pretty free to transform the data as needed. 
> 
> 
> 
> On 6. Juli 2016 um 22:12:26 MESZ, Michael Segel  
> wrote:
>> I think you need to learn the basics of how to build a ‘data 
>> lake/pond/sewer’ first. 
>> 
>> The short answer is yes. 
>> The longer answer is that you need to think more about translating a 
>> relational model in to a hierarchical model, something that I seriously 
>> doubt has been taught in schools in a very long time. 
>> 
>> Then there’s more to the design, including indexing. 
>> Do you want to stick with SQL or do you want to hand code the work to allow 
>> for indexing / secondary indexing to help with the filtering since Spark SQL 
>> doesn’t really handle indexing. Note that you could actually still use an 
>> index table (narrow/thin inverted table) and join against the base table to 
>> get better performance. 
>> 
>> There’s more to this, but you get the idea.
>> 
>> HTH
>> 
>> -Mike
>> 
>> > On Jul 6, 2016, at 2:25 PM, dabuki wrote:
>> > 
>> > I was thinking about to replace a legacy batch job with Spark, but I'm not
>> > sure if Spark is suited for this use case. Before I start the proof of
>> > concept, I wanted to ask for opinions.
>> > 
>> > The legacy job works as follows: A file (100k - 1 mio entries) is iterated.
>> > Every row contains a (book) order with an id and for each row approx. 15
>> > processing steps have to be performed that involve access to multiple
>> > database tables. In total approx. 25 tables (each containing 10k-700k
>> > entries) have to be scanned using the book's id and the retrieved data is
>> > joined together. 
>> > 
>> > As I'm new to Spark I'm not sure if I can leverage Spark's processing model
>> > for this use case.
>> > 
>> > 
>> > 
>> > 
>> > 
>> > --
>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> > 
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> > 
>> > 
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Mich Talebzadeh
Well your mileage varies depending on what you want to do.

I suggest that you do a POC to find out exactly what benefits you are going
to get and if the approach is going to pay.

Spark does not have a CBO like DB2 or Oracle but provides DAG and in-memory
capabilities. Use something basis like Spark-shell to start experimenting
and take it from there.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 21:24, Andreas Bauer  wrote:

> Thanks for the advice. I have to retrieve the basic data from the DB2
> tables but afterwards I'm pretty free to transform the data as needed.
>
>
>
> On 6. Juli 2016 um 22:12:26 MESZ, Michael Segel 
> wrote:
>
> I think you need to learn the basics of how to build a ‘data
> lake/pond/sewer’ first.
>
> The short answer is yes.
> The longer answer is that you need to think more about translating a
> relational model in to a hierarchical model, something that I seriously
> doubt has been taught in schools in a very long time.
>
> Then there’s more to the design, including indexing.
> Do you want to stick with SQL or do you want to hand code the work to
> allow for indexing / secondary indexing to help with the filtering since
> Spark SQL doesn’t really handle indexing. Note that you could actually
> still use an index table (narrow/thin inverted table) and join against the
> base table to get better performance.
>
> There’s more to this, but you get the idea.
>
> HTH
>
> -Mike
>
> > On Jul 6, 2016, at 2:25 PM, dabuki wrote:
> >
> > I was thinking about to replace a legacy batch job with Spark, but I'm
> not
> > sure if Spark is suited for this use case. Before I start the proof of
> > concept, I wanted to ask for opinions.
> >
> > The legacy job works as follows: A file (100k - 1 mio entries) is
> iterated.
> > Every row contains a (book) order with an id and for each row approx. 15
> > processing steps have to be performed that involve access to multiple
> > database tables. In total approx. 25 tables (each containing 10k-700k
> > entries) have to be scanned using the book's id and the retrieved data is
> > joined together.
> >
> > As I'm new to Spark I'm not sure if I can leverage Spark's processing
> model
> > for this use case.
> >
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Andreas Bauer
Thanks for the advice. I have to retrieve the basic data from the DB2 tables 
but afterwards I'm pretty free to transform the data as needed. 



I think you need to learn the basics of how to build a ‘data lake/pond/sewer’ 
first.  


The short answer is yes.  
The longer answer is that you need to think more about translating a relational 
model in to a hierarchical model, something that I seriously doubt has been 
taught in schools in a very long time.   


Then there’s more to the design, including indexing.  
Do you want to stick with SQL or do you want to hand code the work to allow for 
indexing / secondary indexing to help with the filtering since Spark SQL 
doesn’t really handle indexing. Note that you could actually still use an index 
table (narrow/thin inverted table) and join against the base table to get 
better performance.  


There’s more to this, but you get the idea.



HTH



-Mike



> On Jul 6, 2016, at 2:25 PM, dabuki  wrote:

>  
> I was thinking about to replace a legacy batch job with Spark, but I'm not

> sure if Spark is suited for this use case. Before I start the proof of

> concept, I wanted to ask for opinions.

>  
> The legacy job works as follows: A file (100k - 1 mio entries) is iterated.

> Every row contains a (book) order with an id and for each row approx. 15

> processing steps have to be performed that involve access to multiple

> database tables. In total approx. 25 tables (each containing 10k-700k

> entries) have to be scanned using the book's id and the retrieved data is

> joined together.  
>  
> As I'm new to Spark I'm not sure if I can leverage Spark's processing model

> for this use case.

>  
>  
>  
>  
>  
> --

> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html

> Sent from the Apache Spark User List mailing list archive at Nabble.com.

>  
> -

> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

>  
>  




-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org





Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Andreas Bauer
Yes, that was the idea to cache the tables in memory as they should neatly fit. 
 The loading time is no problem as the job is not time critical. The critical 
point is the constant access to the DB2 tables, which consumes costly MIPS, and 
this I hope to replace with the cached version.  So, I'll definitely give it 
try :)



On 6. Juli 2016 um 21:59:28 MESZ, Mich Talebzadeh  
wrote:Well you can try it. I have done it with Oracle, SAP Sybase IQ etc but 
need to be aware of time that JDBC connection is going to take to load data.  
Sounds like your tables are pretty small so they can be cached.  Where are you 
going to store the result set etc?  HTHDr Mich Talebzadeh LinkedIn  
  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com   Disclaimer: Use it at your own 
risk.  Any and all responsibility for any loss, damage or destruction of data 
or any other property which may arise from relying on this email's technical 
content is explicitly disclaimed. The author will in no case be liable for any 
monetary damages arising from such loss, damage or destruction.     
 On 6 July 2016 at 20:54, Andreas Bauer  wrote: In fact, 
yes. On 6. Juli 2016 um 21:46:34 MESZ, Mich Talebzadeh 
 wrote:So you want to use Spark as the query engine 
accessing DB2 tables via JDBC?   Dr Mich Talebzadeh LinkedIn    
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com   Disclaimer: Use it at your own 
risk.  Any and all responsibility for any loss, damage or destruction of data 
or any other property which may arise from relying on this email's technical 
content is explicitly disclaimed. The author will in no case be liable for any 
monetary damages arising from such loss, damage or destruction.     
 On 6 July 2016 at 20:39, Andreas Bauer  wrote:The sql 
statements are embedded in a PL/1 program using DB2 running ob z/OS. Quite 
powerful, but expensive and foremost shared withother jobs in the comapny. The 
whole job takes approx. 20 minutes.  So I was thinking to use Spark and let the 
Spark job run on 10 or 20 virtual instances, which I can spawn easily, 
on-demand and almost for free using a cloud infrastructure.  On 6. Juli 2016 um 
21:29:53 MESZ, Jean Georges Perrin  wrote:What are you doing it 
on right now?> On Jul 6, 2016, at 3:25 PM, dabuki   wrote:> > I was thinking 
about to replace a legacy batch job with Spark, but I'm not> sure if Spark is 
suited for this use case. Before I start the proof of> concept, I wanted to ask 
for opinions.> > The legacy job works as follows: A file (100k - 1 mio entries) 
is iterated.> Every row contains a (book) order with an id and for each row 
approx. 15> processing steps have to be performed that involve access to 
multiple> database tables. In total approx. 25 tables (each containing 
10k-700k> entries) have to be scanned using the book's id and the retrieved 
data is> joined together. > > As I'm new to Spark I'm not sure if I can 
leverage Spark's processing model> for this use case.> > > > > > --> View this 
message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html>
 Sent from the Apache Spark User List mailing list archive at Nabble.com.> > 
-> To 
unsubscribe e-mail: user-unsubscr...@spark.apache.org>  

Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Michael Segel
I think you need to learn the basics of how to build a ‘data lake/pond/sewer’ 
first. 

The short answer is yes. 
The longer answer is that you need to think more about translating a relational 
model in to a hierarchical model, something that I seriously doubt has been 
taught in schools in a very long time.  

Then there’s more to the design, including indexing. 
Do you want to stick with SQL or do you want to hand code the work to allow for 
indexing / secondary indexing to help with the filtering since Spark SQL 
doesn’t really handle indexing. Note that you could actually still use an index 
table (narrow/thin inverted table) and join against the base table to get 
better performance. 

There’s more to this, but you get the idea.

HTH

-Mike

> On Jul 6, 2016, at 2:25 PM, dabuki  wrote:
> 
> I was thinking about to replace a legacy batch job with Spark, but I'm not
> sure if Spark is suited for this use case. Before I start the proof of
> concept, I wanted to ask for opinions.
> 
> The legacy job works as follows: A file (100k - 1 mio entries) is iterated.
> Every row contains a (book) order with an id and for each row approx. 15
> processing steps have to be performed that involve access to multiple
> database tables. In total approx. 25 tables (each containing 10k-700k
> entries) have to be scanned using the book's id and the retrieved data is
> joined together. 
> 
> As I'm new to Spark I'm not sure if I can leverage Spark's processing model
> for this use case.
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Mich Talebzadeh
Well you can try it. I have done it with Oracle, SAP Sybase IQ etc but need
to be aware of time that JDBC connection is going to take to load data.

Sounds like your tables are pretty small so they can be cached.

Where are you going to store the result set etc?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 20:54, Andreas Bauer  wrote:

>  In fact, yes.
>
>
>
>
> On 6. Juli 2016 um 21:46:34 MESZ, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
> So you want to use Spark as the query engine accessing DB2 tables via JDBC?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 July 2016 at 20:39, Andreas Bauer  wrote:
>
> The sql statements are embedded in a PL/1 program using DB2 running ob
> z/OS. Quite powerful, but expensive and foremost shared withother jobs in
> the comapny. The whole job takes approx. 20 minutes.
>
> So I was thinking to use Spark and let the Spark job run on 10 or 20
> virtual instances, which I can spawn easily, on-demand and almost for free
> using a cloud infrastructure.
>
>
>
>
> On 6. Juli 2016 um 21:29:53 MESZ, Jean Georges Perrin  wrote:
>
> What are you doing it on right now?
>
> > On Jul 6, 2016, at 3:25 PM, dabuki wrote:
> >
> > I was thinking about to replace a legacy batch job with Spark, but I'm
> not
> > sure if Spark is suited for this use case. Before I start the proof of
> > concept, I wanted to ask for opinions.
> >
> > The legacy job works as follows: A file (100k - 1 mio entries) is
> iterated.
> > Every row contains a (book) order with an id and for each row approx. 15
> > processing steps have to be performed that involve access to multiple
> > database tables. In total approx. 25 tables (each containing 10k-700k
> > entries) have to be scanned using the book's id and the retrieved data is
> > joined together.
> >
> > As I'm new to Spark I'm not sure if I can leverage Spark's processing
> model
> > for this use case.
> >
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
>


Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Jean Georges Perrin
Right now, I am having "fun" with Spark and 26446249960843350 datapoints on my 
MacBook Air, but my small friend is suffering... 

From my experience:
You will be able to do the job with Spark. You can try to load everything on a 
dev machine, no need to have a server, a workstation might be enough.
I would not recommend VM when you go to production, unless you have them. Bare 
metal seems more suitable.

It's definitely worth a shot!

> On Jul 6, 2016, at 3:39 PM, Andreas Bauer  wrote:
> 
> The sql statements are embedded in a PL/1 program using DB2 running ob z/OS. 
> Quite powerful, but expensive and foremost shared withother jobs in the 
> comapny. The whole job takes approx. 20 minutes. 
> 
> So I was thinking to use Spark and let the Spark job run on 10 or 20 virtual 
> instances, which I can spawn easily, on-demand and almost for free using a 
> cloud infrastructure. 
> 
> 
> 
> 
> On 6. Juli 2016 um 21:29:53 MESZ, Jean Georges Perrin  wrote:
>> What are you doing it on right now?
>> 
>> > On Jul 6, 2016, at 3:25 PM, dabuki wrote:
>> > 
>> > I was thinking about to replace a legacy batch job with Spark, but I'm not
>> > sure if Spark is suited for this use case. Before I start the proof of
>> > concept, I wanted to ask for opinions.
>> > 
>> > The legacy job works as follows: A file (100k - 1 mio entries) is iterated.
>> > Every row contains a (book) order with an id and for each row approx. 15
>> > processing steps have to be performed that involve access to multiple
>> > database tables. In total approx. 25 tables (each containing 10k-700k
>> > entries) have to be scanned using the book's id and the retrieved data is
>> > joined together. 
>> > 
>> > As I'm new to Spark I'm not sure if I can leverage Spark's processing model
>> > for this use case.
>> > 
>> > 
>> > 
>> > 
>> > 
>> > --
>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> > 
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> > 
>> 



Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Andreas Bauer
 In fact, yes. 



On 6. Juli 2016 um 21:46:34 MESZ, Mich Talebzadeh  
wrote:So you want to use Spark as the query engine accessing DB2 tables via 
JDBC?   Dr Mich Talebzadeh LinkedIn    
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com   Disclaimer: Use it at your own 
risk.  Any and all responsibility for any loss, damage or destruction of data 
or any other property which may arise from relying on this email's technical 
content is explicitly disclaimed. The author will in no case be liable for any 
monetary damages arising from such loss, damage or destruction.     
 On 6 July 2016 at 20:39, Andreas Bauer  wrote:The sql 
statements are embedded in a PL/1 program using DB2 running ob z/OS. Quite 
powerful, but expensive and foremost shared withother jobs in the comapny. The 
whole job takes approx. 20 minutes.  So I was thinking to use Spark and let the 
Spark job run on 10 or 20 virtual instances, which I can spawn easily, 
on-demand and almost for free using a cloud infrastructure.  On 6. Juli 2016 um 
21:29:53 MESZ, Jean Georges Perrin  wrote:What are you doing it 
on right now?> On Jul 6, 2016, at 3:25 PM, dabuki   wrote:> > I was thinking 
about to replace a legacy batch job with Spark, but I'm not> sure if Spark is 
suited for this use case. Before I start the proof of> concept, I wanted to ask 
for opinions.> > The legacy job works as follows: A file (100k - 1 mio entries) 
is iterated.> Every row contains a (book) order with an id and for each row 
approx. 15> processing steps have to be performed that involve access to 
multiple> database tables. In total approx. 25 tables (each containing 
10k-700k> entries) have to be scanned using the book's id and the retrieved 
data is> joined together. > > As I'm new to Spark I'm not sure if I can 
leverage Spark's processing model> for this use case.> > > > > > --> View this 
message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html>
 Sent from the Apache Spark User List mailing list archive at Nabble.com.> > 
-> To 
unsubscribe e-mail: user-unsubscr...@spark.apache.org>   

Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Mich Talebzadeh
So you want to use Spark as the query engine accessing DB2 tables via JDBC?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 20:39, Andreas Bauer  wrote:

> The sql statements are embedded in a PL/1 program using DB2 running ob
> z/OS. Quite powerful, but expensive and foremost shared withother jobs in
> the comapny. The whole job takes approx. 20 minutes.
>
> So I was thinking to use Spark and let the Spark job run on 10 or 20
> virtual instances, which I can spawn easily, on-demand and almost for free
> using a cloud infrastructure.
>
>
>
>
> On 6. Juli 2016 um 21:29:53 MESZ, Jean Georges Perrin  wrote:
>
> What are you doing it on right now?
>
> > On Jul 6, 2016, at 3:25 PM, dabuki wrote:
> >
> > I was thinking about to replace a legacy batch job with Spark, but I'm
> not
> > sure if Spark is suited for this use case. Before I start the proof of
> > concept, I wanted to ask for opinions.
> >
> > The legacy job works as follows: A file (100k - 1 mio entries) is
> iterated.
> > Every row contains a (book) order with an id and for each row approx. 15
> > processing steps have to be performed that involve access to multiple
> > database tables. In total approx. 25 tables (each containing 10k-700k
> > entries) have to be scanned using the book's id and the retrieved data is
> > joined together.
> >
> > As I'm new to Spark I'm not sure if I can leverage Spark's processing
> model
> > for this use case.
> >
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>


Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Andreas Bauer
The sql statements are embedded in a PL/1 program using DB2 running ob z/OS. 
Quite powerful, but expensive and foremost shared withother jobs in the 
comapny. The whole job takes approx. 20 minutes.  So I was thinking to use 
Spark and let the Spark job run on 10 or 20 virtual instances, which I can 
spawn easily, on-demand and almost for free using a cloud infrastructure.   



What are you doing it on right now?



> On Jul 6, 2016, at 3:25 PM, dabuki  wrote:

>  
> I was thinking about to replace a legacy batch job with Spark, but I'm not

> sure if Spark is suited for this use case. Before I start the proof of

> concept, I wanted to ask for opinions.

>  
> The legacy job works as follows: A file (100k - 1 mio entries) is iterated.

> Every row contains a (book) order with an id and for each row approx. 15

> processing steps have to be performed that involve access to multiple

> database tables. In total approx. 25 tables (each containing 10k-700k

> entries) have to be scanned using the book's id and the retrieved data is

> joined together.  
>  
> As I'm new to Spark I'm not sure if I can leverage Spark's processing model

> for this use case.

>  
>  
>  
>  
>  
> --

> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html

> Sent from the Apache Spark User List mailing list archive at Nabble.com.

>  
> -

> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

>  




Re: Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread Jean Georges Perrin
What are you doing it on right now?

> On Jul 6, 2016, at 3:25 PM, dabuki  wrote:
> 
> I was thinking about to replace a legacy batch job with Spark, but I'm not
> sure if Spark is suited for this use case. Before I start the proof of
> concept, I wanted to ask for opinions.
> 
> The legacy job works as follows: A file (100k - 1 mio entries) is iterated.
> Every row contains a (book) order with an id and for each row approx. 15
> processing steps have to be performed that involve access to multiple
> database tables. In total approx. 25 tables (each containing 10k-700k
> entries) have to be scanned using the book's id and the retrieved data is
> joined together. 
> 
> As I'm new to Spark I'm not sure if I can leverage Spark's processing model
> for this use case.
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is Spark suited for replacing a batch job using many database tables?

2016-07-06 Thread dabuki
I was thinking about to replace a legacy batch job with Spark, but I'm not
sure if Spark is suited for this use case. Before I start the proof of
concept, I wanted to ask for opinions.

The legacy job works as follows: A file (100k - 1 mio entries) is iterated.
Every row contains a (book) order with an id and for each row approx. 15
processing steps have to be performed that involve access to multiple
database tables. In total approx. 25 tables (each containing 10k-700k
entries) have to be scanned using the book's id and the retrieved data is
joined together. 

As I'm new to Spark I'm not sure if I can leverage Spark's processing model
for this use case.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-suited-for-replacing-a-batch-job-using-many-database-tables-tp27300.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark classloader question

2016-07-06 Thread Chen Song
Hi

I ran into problems to use class loader in Spark. In my code (run within
executor), I explicitly load classes using the ContextClassLoader as below.

Thread.currentThread().getContextClassLoader()

The jar containing the classes to be loaded is added via the --jars option
in spark-shell/spark-submit.

I always get the class not found exception. However, it seems to work if I
compile these classes in main jar for the job (the jar containing the main
job class).

I know Spark implements its own class loaders in a particular way. Is there
a way to work around this? In other words, what is the proper way to
programmatically load classes in other jars added via --jars in Spark?


Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
> Yes and I sent you results. It is appropriate only with known parameters of 
> input data stream.

No, as far as I can tell from your posts in this thread and your
linked project, you only tested with auto.offset.reset smallest and a
large backlog.  That's not what I advised you to do.  Don't draw
inaccurate conclusions about Spark DStreams from that test.  The
reason you need to specify maxRatePerPartition is because you're
starting with a large backlog and thus a large first batch.  If you
were testing an ongoing stream with auto.offset.reset largest,
backpressure alone should be sufficient.



On Wed, Jul 6, 2016 at 12:23 PM, rss rss  wrote:
>> If you aren't processing messages as fast as you receive them, you're
>> going to run out of kafka retention regardless of whether you're using
>> Spark or Flink.  Again, physics.  It's just a question of what
>> compromises you choose.
>
>
> Yes. I wrote about it. But in case of Flink you will have output strictly
> after specified time. If it is impossible to process 1000 messages per 1
> second but possible process 500, then Flink makes an output for 500. If only
> 1 message processed, Flink produced an output for one only but after 1
> second. At the same time Spark processes all 1000 but much longer that 1
> second in this case.
>
>>  that's what backpressure
>> and maxRatePerPartition are for.  As long as those are set reasonably,
>> you'll have a reasonably fixed output interval.  Have you actually
>> tested this in the way I suggested?
>
>
> Yes and I sent you results. It is appropriate only with known parameters of
> input data stream. I'm not able to estimate bounds of Sparks usage in
> general. And I'm not about it. I'm about Spark has these limitations. And
> most problem is when a calculation time depends on input data. That is if
> you want to have a stable period of output data generation you have to use
> computational system with free resources in hot reserve.
>
>  In any case thanks, now I understand how to use Spark.
>
> PS: I will continue work with Spark but to minimize emails stream I plan to
> unsubscribe from this mail list
>
> 2016-07-06 18:55 GMT+02:00 Cody Koeninger :
>>
>> If you aren't processing messages as fast as you receive them, you're
>> going to run out of kafka retention regardless of whether you're using
>> Spark or Flink.  Again, physics.  It's just a question of what
>> compromises you choose.
>>
>> If by "growing of a processing window time of Spark" you mean a
>> processing time that exceeds batch time... that's what backpressure
>> and maxRatePerPartition are for.  As long as those are set reasonably,
>> you'll have a reasonably fixed output interval.  Have you actually
>> tested this in the way I suggested?
>>
>> On Wed, Jul 6, 2016 at 11:38 AM, rss rss  wrote:
>> > Ok, thanks. But really this is not full decision. In case of growing of
>> > processing time I will have growing of window time. That is really with
>> > Spark I have 2 points of a latency growing. First is a delay of
>> > processing
>> > of messages in Kafka queue due to physical limitation of a computer
>> > system.
>> > And second one is growing of a processing window time of Spark. In case
>> > of
>> > Flink there is only first point of delay but time intervals of output
>> > data
>> > are fixed. It is really looks like limitation of Spark. That is if
>> > dataflow
>> > is stable, all is ok. If there are peaks of loading more than
>> > possibility of
>> > computational system or data dependent time of calculation, Spark is not
>> > able to provide a periodically stable results output. Sometimes this is
>> > appropriate but sometime this is not appropriate.
>> >
>> > 2016-07-06 18:11 GMT+02:00 Cody Koeninger :
>> >>
>> >> Then double the upper limit you have set until the processing time
>> >> approaches the batch time.
>> >>
>> >> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
>> >> > Ok, with:
>> >> >
>> >> > .set("spark.streaming.backpressure.enabled","true")
>> >> > .set("spark.streaming.receiver.maxRate", "1")
>> >> > .set("spark.streaming.kafka.maxRatePerPartition", "1")
>> >> >
>> >> > I have something like
>> >> >
>> >> >
>> >> >
>> >> > ***
>> >> > Processing time: 5626
>> >> > Expected time: 1
>> >> > Processed messages: 10
>> >> > Message example: {"message": 950002,
>> >> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> >> > Recovered json:
>> >> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> >> >
>> >> > That is yes, it works but throughput is much less than without
>> >> > limitations
>> >> > because of this is an absolute upper limit. And time of processing is
>> >> > half
>> >> > of available.
>> >> >
>> >> > Regarding Spark 2.0 structured streaming I will look it some later.
>> >> > Now
>> >> > I
>> >> > don't know how to 

Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread Mich Talebzadeh
This will work in Hive

Don't know why you are getting null values

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
HiveContext.sql("use test")
val e = HiveContext.table("emp")
val d = HiveContext.table("dept")
val rs = e.join(d,e("deptid")===d("deptid"), "fullouter")
rs.registerTempTable("tmp")
HiveContext.sql("DROP TABLE IF EXISTS test.rs")
var sqltext: String = ""
sqltext =
"""
create table test.rs(emp_id int, name varchar(30), emp_deptid int,
dept_deptid int, dept_name varchar(30))
"""
HiveContext.sql(sqltext)
sqltext =
"""
INSERT INTO test.rs
SELECT * FROM tmp
"""
HiveContext.sql(sqltext)
HiveContext.sql("select * from test.rs").show
sys.exit ()


Now with Vertica I have no idea

Can you do something like below in sql with insert/select

CASE IF COLUMN is NULL THEN ""

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 18:18, Radha krishna  wrote:

> Hi Mich,
> Here I given just a sample data,
> I have some GB's of files in HDFS and performing left outer joins on those
> files, and the final result I am going to store in Vertica data base table.
> There is no duplicate columns in the target table but for the non matching
> rows columns I want to insert "" empty value instead of null word.
> On 06-Jul-2016 10:31 pm, "Mich Talebzadeh" 
> wrote:
>
>> what do you mean database table here?
>>
>> you have repeating column names for the table namely deptid
>>
>>
>> 0: jdbc:hive2://rhes564:10010/default>  SELECT *  FROM emp e LEFT OUTER
>> JOIN dept d ON e.deptid = d.deptid;
>>
>> INFO  : OK
>> +---+-+---+---+--+--+
>> | e.emp_id  | e.name  | e.deptid  | d.deptid  | d.dept_name  |
>> +---+-+---+---+--+--+
>> | 1001  | aba | 10| 10| DEV  |
>> | 1002  | abs | 20| 20| TEST |
>> | 1003  | abd | 10| 10| DEV  |
>> | 1001  | aba | 10| 10| DEV  |
>> | 1002  | abs | 20| 20| TEST |
>> | 1003  | abd | 10| 10| DEV  |
>> | 1004  | abf | 30| 30| IT   |
>> | 1005  | abg | 10| 10| DEV  |
>> | 1004  | abf | 30| 30| IT   |
>> | 1005  | abg | 10| 10| DEV  |
>> | 1006  | abh | 20| 20| TEST |
>> | 1007  | abj | 10| 10| DEV  |
>> | 1006  | abh | 20| 20| TEST |
>> | 1007  | abj | 10| 10| DEV  |
>> | 1008  | abk | 30| 30| IT   |
>> | 1009  | abl | 20| 20| TEST |
>> | 1010  | abq | 10| 10| DEV  |
>> | 1008  | abk | 30| 30| IT   |
>> | 1009  | abl | 20| 20| TEST |
>> | 1010  | abq | 10| 10| DEV  |
>> +---+-+---+---+--+--+
>> 20 rows selected (44.351 seconds)
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 6 July 2016 at 17:48, radha  wrote:
>>
>>> Hi ,
>>> Thanks all, its working fine the issue is with some space for the dept
>>> id,
>>>
>>> I have one more doubt for the non matching records its showing null word,
>>> even if i write into HDFS also its showing null word how can we avoid
>>> writing null for the non matching columns, i want just empty value ("")
>>>
>>> same input i used in the dept table i removed the last row and the below
>>> code i used to write into hdfs.
>>>
>>> DataFrame joinResult = sqlContext.sql("SELECT * FROM EMP e LEFT OUTER
>>> JOIN
>>> DEPT d 

Re: Difference between DataFrame.write.jdbc and DataFrame.write.format("jdbc")

2016-07-06 Thread Rabin Banerjee
HI Buddy,

   I sued both but DataFrame.write.jdbc is old, and will work if provide
table name , It wont work if you provide custom queries . Where
as DataFrame.write.format is more generic as well as working perfectly with
not only table name but also custom queries . Hence I recommend to use
the DataFrame.write.format("jdbc") .

Cheers !
Rabin



On Wed, Jul 6, 2016 at 10:35 PM, Dragisa Krsmanovic 
wrote:

> I was expecting to get the same results with both:
>
> dataFrame.write.mode(SaveMode.Overwrite).jdbc(dbUrl, "my_table", props)
>
> and
>
> dataFrame.write.mode(SaveMode.Overwrite).format("jdbc").options(opts).option("dbtable",
> "my_table")
>
>
> In the first example, it behaves as expected. It creates a new table and
> populates it with the rows from DataFrame.
>
> In the second case, I get exception:
> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not
> allow create table as select.
>
> Looking at the Spark source, it looks like there is a completely separate
> implementation for format("jdbc") and for jdbc(...).
>
> I find that confusing. Unfortunately documentation is rather sparse and
> one finds this discrepancy only through trial and error.
>
> Is there a plan to deprecate one of the forms ? Or to allow same
> functionality for both ?
>
> I tried both 1.6 and 2.0-preview
> --
>
> Dragiša Krsmanović | Platform Engineer | Ticketfly
>
> dragi...@ticketfly.com
>
> @ticketfly  | ticketfly.com/blog |
> facebook.com/ticketfly
>


Maintain complete state for updateStateByKey

2016-07-06 Thread Sunita Arvind
Hello Experts,

I have a requirement of maintaining a list of ids for every customer for
all of time. I should be able to provide count distinct ids on demand. All
the examples I have seen so far indicate I need to maintain counts
directly. My concern is, I will not be able to identify cumulative distinct
values in that case. Also, maintaining a state so huge would be tolerable
to the framework?

Here is what I am attempting:

val updateUniqueids :Option[RDD[String]] = (values: Seq[String],
state: Option[RDD[String]]) => {
  val currentLst = values.distinct
  val previousLst:RDD[String] = state.getOrElse().asInstanceOf[RDD[String]]
  Some(currentLst.union(previousLst).distinct)
}

Another challenge is concatenating RDD[String] and Seq[String] without
being able to access the spark context as this function has to adhere to

updateFunc: (Seq[V], Option[S]) => Option[S]

I'm also trying to figure out if I can use the
(iterator: Iterator[(K, Seq[V], Option[S])]) but haven't figured it out yet.

Appreciate any suggestions in this regard.

regards

Sunita

P.S:
I am aware of mapwithState but not on the latest version as of now.


Re: Difference between DataFrame.write.jdbc and DataFrame.write.format("jdbc")

2016-07-06 Thread Dragisa Krsmanovic
Yes, I had the save() at the end. I truncated example to highlight the
difference and forgot to put back the save()

It would be great to have the same behavior (and same code used) for both
jdbc() and format("jdbc").

Thank you.

On Wed, Jul 6, 2016 at 10:21 AM, Xiao Li  wrote:

> Hi, Dragisa,
>
> Your second way is incomplete, right? To get the error you showed, you
> need to put save() there.
>
> Yeah, we can implement the trait CreatableRelationProvider for JDBC. Then,
> you will not see that error.
>
> Will submit a PR for that.
>
> Thanks,
>
> Xiao
>
>
> 2016-07-06 10:05 GMT-07:00 Dragisa Krsmanovic :
>
>> I was expecting to get the same results with both:
>>
>> dataFrame.write.mode(SaveMode.Overwrite).jdbc(dbUrl, "my_table", props)
>>
>> and
>>
>> dataFrame.write.mode(SaveMode.Overwrite).format("jdbc").options(opts).option("dbtable",
>> "my_table")
>>
>>
>> In the first example, it behaves as expected. It creates a new table and
>> populates it with the rows from DataFrame.
>>
>> In the second case, I get exception:
>> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not
>> allow create table as select.
>>
>> Looking at the Spark source, it looks like there is a completely separate
>> implementation for format("jdbc") and for jdbc(...).
>>
>> I find that confusing. Unfortunately documentation is rather sparse and
>> one finds this discrepancy only through trial and error.
>>
>> Is there a plan to deprecate one of the forms ? Or to allow same
>> functionality for both ?
>>
>> I tried both 1.6 and 2.0-preview
>> --
>>
>> Dragiša Krsmanović | Platform Engineer | Ticketfly
>>
>> dragi...@ticketfly.com
>>
>> @ticketfly  | ticketfly.com/blog |
>> facebook.com/ticketfly
>>
>
>


-- 

Dragiša Krsmanović | Platform Engineer | Ticketfly

dragi...@ticketfly.com

@ticketfly  | ticketfly.com/blog |
facebook.com/ticketfly


Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
>
> If you aren't processing messages as fast as you receive them, you're
> going to run out of kafka retention regardless of whether you're using
> Spark or Flink.  Again, physics.  It's just a question of what
> compromises you choose.


Yes. I wrote about it. But in case of Flink you will have output strictly
after specified time. If it is impossible to process 1000 messages per 1
second but possible process 500, then Flink makes an output for 500. If
only 1 message processed, Flink produced an output for one only but after 1
second. At the same time Spark processes all 1000 but much longer that 1
second in this case.

 that's what backpressure
> and maxRatePerPartition are for.  As long as those are set reasonably,
> you'll have a reasonably fixed output interval.  Have you actually
> tested this in the way I suggested?


Yes and I sent you results. It is appropriate only with known parameters of
input data stream. I'm not able to estimate bounds of Sparks usage in
general. And I'm not about it. I'm about Spark has these limitations. And
most problem is when a calculation time depends on input data. That is if
you want to have a stable period of output data generation you have to use
computational system with free resources in hot reserve.

 In any case thanks, now I understand how to use Spark.

PS: I will continue work with Spark but to minimize emails stream I plan to
unsubscribe from this mail list

2016-07-06 18:55 GMT+02:00 Cody Koeninger :

> If you aren't processing messages as fast as you receive them, you're
> going to run out of kafka retention regardless of whether you're using
> Spark or Flink.  Again, physics.  It's just a question of what
> compromises you choose.
>
> If by "growing of a processing window time of Spark" you mean a
> processing time that exceeds batch time... that's what backpressure
> and maxRatePerPartition are for.  As long as those are set reasonably,
> you'll have a reasonably fixed output interval.  Have you actually
> tested this in the way I suggested?
>
> On Wed, Jul 6, 2016 at 11:38 AM, rss rss  wrote:
> > Ok, thanks. But really this is not full decision. In case of growing of
> > processing time I will have growing of window time. That is really with
> > Spark I have 2 points of a latency growing. First is a delay of
> processing
> > of messages in Kafka queue due to physical limitation of a computer
> system.
> > And second one is growing of a processing window time of Spark. In case
> of
> > Flink there is only first point of delay but time intervals of output
> data
> > are fixed. It is really looks like limitation of Spark. That is if
> dataflow
> > is stable, all is ok. If there are peaks of loading more than
> possibility of
> > computational system or data dependent time of calculation, Spark is not
> > able to provide a periodically stable results output. Sometimes this is
> > appropriate but sometime this is not appropriate.
> >
> > 2016-07-06 18:11 GMT+02:00 Cody Koeninger :
> >>
> >> Then double the upper limit you have set until the processing time
> >> approaches the batch time.
> >>
> >> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
> >> > Ok, with:
> >> >
> >> > .set("spark.streaming.backpressure.enabled","true")
> >> > .set("spark.streaming.receiver.maxRate", "1")
> >> > .set("spark.streaming.kafka.maxRatePerPartition", "1")
> >> >
> >> > I have something like
> >> >
> >> >
> >> >
> ***
> >> > Processing time: 5626
> >> > Expected time: 1
> >> > Processed messages: 10
> >> > Message example: {"message": 950002,
> >> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> >> > Recovered json:
> >> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> >> >
> >> > That is yes, it works but throughput is much less than without
> >> > limitations
> >> > because of this is an absolute upper limit. And time of processing is
> >> > half
> >> > of available.
> >> >
> >> > Regarding Spark 2.0 structured streaming I will look it some later.
> Now
> >> > I
> >> > don't know how to strictly measure throughput and latency of this high
> >> > level
> >> > API. My aim now is to compare streaming processors.
> >> >
> >> >
> >> > 2016-07-06 17:41 GMT+02:00 Cody Koeninger :
> >> >>
> >> >> The configuration you set is spark.streaming.receiver.maxRate.  The
> >> >> direct stream is not a receiver.  As I said in my first message in
> >> >> this thread, and as the pages at
> >> >>
> >> >>
> >> >>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
> >> >> and
> >> >>
> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> >> >> also say, use maxRatePerPartition for the direct stream.
> >> >>
> >> >> Bottom line, if you have more information than your system can
> process
> >> >> in X amount of 

Re: Difference between DataFrame.write.jdbc and DataFrame.write.format("jdbc")

2016-07-06 Thread Xiao Li
Hi, Dragisa,

Your second way is incomplete, right? To get the error you showed, you need
to put save() there.

Yeah, we can implement the trait CreatableRelationProvider for JDBC. Then,
you will not see that error.

Will submit a PR for that.

Thanks,

Xiao


2016-07-06 10:05 GMT-07:00 Dragisa Krsmanovic :

> I was expecting to get the same results with both:
>
> dataFrame.write.mode(SaveMode.Overwrite).jdbc(dbUrl, "my_table", props)
>
> and
>
> dataFrame.write.mode(SaveMode.Overwrite).format("jdbc").options(opts).option("dbtable",
> "my_table")
>
>
> In the first example, it behaves as expected. It creates a new table and
> populates it with the rows from DataFrame.
>
> In the second case, I get exception:
> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not
> allow create table as select.
>
> Looking at the Spark source, it looks like there is a completely separate
> implementation for format("jdbc") and for jdbc(...).
>
> I find that confusing. Unfortunately documentation is rather sparse and
> one finds this discrepancy only through trial and error.
>
> Is there a plan to deprecate one of the forms ? Or to allow same
> functionality for both ?
>
> I tried both 1.6 and 2.0-preview
> --
>
> Dragiša Krsmanović | Platform Engineer | Ticketfly
>
> dragi...@ticketfly.com
>
> @ticketfly  | ticketfly.com/blog |
> facebook.com/ticketfly
>


Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread Radha krishna
Hi Mich,
Here I given just a sample data,
I have some GB's of files in HDFS and performing left outer joins on those
files, and the final result I am going to store in Vertica data base table.
There is no duplicate columns in the target table but for the non matching
rows columns I want to insert "" empty value instead of null word.
On 06-Jul-2016 10:31 pm, "Mich Talebzadeh" 
wrote:

> what do you mean database table here?
>
> you have repeating column names for the table namely deptid
>
>
> 0: jdbc:hive2://rhes564:10010/default>  SELECT *  FROM emp e LEFT OUTER
> JOIN dept d ON e.deptid = d.deptid;
>
> INFO  : OK
> +---+-+---+---+--+--+
> | e.emp_id  | e.name  | e.deptid  | d.deptid  | d.dept_name  |
> +---+-+---+---+--+--+
> | 1001  | aba | 10| 10| DEV  |
> | 1002  | abs | 20| 20| TEST |
> | 1003  | abd | 10| 10| DEV  |
> | 1001  | aba | 10| 10| DEV  |
> | 1002  | abs | 20| 20| TEST |
> | 1003  | abd | 10| 10| DEV  |
> | 1004  | abf | 30| 30| IT   |
> | 1005  | abg | 10| 10| DEV  |
> | 1004  | abf | 30| 30| IT   |
> | 1005  | abg | 10| 10| DEV  |
> | 1006  | abh | 20| 20| TEST |
> | 1007  | abj | 10| 10| DEV  |
> | 1006  | abh | 20| 20| TEST |
> | 1007  | abj | 10| 10| DEV  |
> | 1008  | abk | 30| 30| IT   |
> | 1009  | abl | 20| 20| TEST |
> | 1010  | abq | 10| 10| DEV  |
> | 1008  | abk | 30| 30| IT   |
> | 1009  | abl | 20| 20| TEST |
> | 1010  | abq | 10| 10| DEV  |
> +---+-+---+---+--+--+
> 20 rows selected (44.351 seconds)
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 July 2016 at 17:48, radha  wrote:
>
>> Hi ,
>> Thanks all, its working fine the issue is with some space for the dept id,
>>
>> I have one more doubt for the non matching records its showing null word,
>> even if i write into HDFS also its showing null word how can we avoid
>> writing null for the non matching columns, i want just empty value ("")
>>
>> same input i used in the dept table i removed the last row and the below
>> code i used to write into hdfs.
>>
>> DataFrame joinResult = sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
>> DEPT d ON e.deptid = d.deptid");
>>joinResult.javaRDD().repartition(1).map(new
>> Function() {
>> private static final long serialVersionUID =
>> 9185646063977504742L;
>> @Override
>> public String call(Row arg0) throws Exception {
>> String s;
>>
>>
>> s=arg0.getString(0)+"\u001c"+arg0.getString(1)+"\u001c"+arg0.getString(2)+"\u001c"+arg0.getString(3)+"\u001c"+arg0.getString(4)+"\u001e";
>> return s;
>> }
>> }).saveAsTextFile(args[2]);
>>
>>
>> Output in HDFS File
>>
>> 10 1001 aba 10 dev
>> 10 1003 abd 10 dev
>> 10 1005 abg 10 dev
>> 10 1007 abj 10 dev
>> 10 1010 abq 10 dev
>> 20 1002 abs 20 Test
>> 20 1006 abh 20 Test
>> 20 1009 abl 20 Test
>> 30 1004 abf null null
>> 30 1008 abk null null
>>
>> in my case i want to store the join result back to data base table and its
>> storing "null" word for those non matching records, i want to store as
>> ""(empty value) for the non matching rows.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295p27299.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Difference between DataFrame.write.jdbc and DataFrame.write.format("jdbc")

2016-07-06 Thread Dragisa Krsmanovic
I was expecting to get the same results with both:

dataFrame.write.mode(SaveMode.Overwrite).jdbc(dbUrl, "my_table", props)

and

dataFrame.write.mode(SaveMode.Overwrite).format("jdbc").options(opts).option("dbtable",
"my_table")


In the first example, it behaves as expected. It creates a new table and
populates it with the rows from DataFrame.

In the second case, I get exception:
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not
allow create table as select.

Looking at the Spark source, it looks like there is a completely separate
implementation for format("jdbc") and for jdbc(...).

I find that confusing. Unfortunately documentation is rather sparse and one
finds this discrepancy only through trial and error.

Is there a plan to deprecate one of the forms ? Or to allow same
functionality for both ?

I tried both 1.6 and 2.0-preview
-- 

Dragiša Krsmanović | Platform Engineer | Ticketfly

dragi...@ticketfly.com

@ticketfly  | ticketfly.com/blog |
facebook.com/ticketfly


Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread Mich Talebzadeh
what do you mean database table here?

you have repeating column names for the table namely deptid


0: jdbc:hive2://rhes564:10010/default>  SELECT *  FROM emp e LEFT OUTER
JOIN dept d ON e.deptid = d.deptid;

INFO  : OK
+---+-+---+---+--+--+
| e.emp_id  | e.name  | e.deptid  | d.deptid  | d.dept_name  |
+---+-+---+---+--+--+
| 1001  | aba | 10| 10| DEV  |
| 1002  | abs | 20| 20| TEST |
| 1003  | abd | 10| 10| DEV  |
| 1001  | aba | 10| 10| DEV  |
| 1002  | abs | 20| 20| TEST |
| 1003  | abd | 10| 10| DEV  |
| 1004  | abf | 30| 30| IT   |
| 1005  | abg | 10| 10| DEV  |
| 1004  | abf | 30| 30| IT   |
| 1005  | abg | 10| 10| DEV  |
| 1006  | abh | 20| 20| TEST |
| 1007  | abj | 10| 10| DEV  |
| 1006  | abh | 20| 20| TEST |
| 1007  | abj | 10| 10| DEV  |
| 1008  | abk | 30| 30| IT   |
| 1009  | abl | 20| 20| TEST |
| 1010  | abq | 10| 10| DEV  |
| 1008  | abk | 30| 30| IT   |
| 1009  | abl | 20| 20| TEST |
| 1010  | abq | 10| 10| DEV  |
+---+-+---+---+--+--+
20 rows selected (44.351 seconds)

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 17:48, radha  wrote:

> Hi ,
> Thanks all, its working fine the issue is with some space for the dept id,
>
> I have one more doubt for the non matching records its showing null word,
> even if i write into HDFS also its showing null word how can we avoid
> writing null for the non matching columns, i want just empty value ("")
>
> same input i used in the dept table i removed the last row and the below
> code i used to write into hdfs.
>
> DataFrame joinResult = sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
> DEPT d ON e.deptid = d.deptid");
>joinResult.javaRDD().repartition(1).map(new
> Function() {
> private static final long serialVersionUID =
> 9185646063977504742L;
> @Override
> public String call(Row arg0) throws Exception {
> String s;
>
>
> s=arg0.getString(0)+"\u001c"+arg0.getString(1)+"\u001c"+arg0.getString(2)+"\u001c"+arg0.getString(3)+"\u001c"+arg0.getString(4)+"\u001e";
> return s;
> }
> }).saveAsTextFile(args[2]);
>
>
> Output in HDFS File
>
> 10 1001 aba 10 dev
> 10 1003 abd 10 dev
> 10 1005 abg 10 dev
> 10 1007 abj 10 dev
> 10 1010 abq 10 dev
> 20 1002 abs 20 Test
> 20 1006 abh 20 Test
> 20 1009 abl 20 Test
> 30 1004 abf null null
> 30 1008 abk null null
>
> in my case i want to store the join result back to data base table and its
> storing "null" word for those non matching records, i want to store as
> ""(empty value) for the non matching rows.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295p27299.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
If you aren't processing messages as fast as you receive them, you're
going to run out of kafka retention regardless of whether you're using
Spark or Flink.  Again, physics.  It's just a question of what
compromises you choose.

If by "growing of a processing window time of Spark" you mean a
processing time that exceeds batch time... that's what backpressure
and maxRatePerPartition are for.  As long as those are set reasonably,
you'll have a reasonably fixed output interval.  Have you actually
tested this in the way I suggested?

On Wed, Jul 6, 2016 at 11:38 AM, rss rss  wrote:
> Ok, thanks. But really this is not full decision. In case of growing of
> processing time I will have growing of window time. That is really with
> Spark I have 2 points of a latency growing. First is a delay of processing
> of messages in Kafka queue due to physical limitation of a computer system.
> And second one is growing of a processing window time of Spark. In case of
> Flink there is only first point of delay but time intervals of output data
> are fixed. It is really looks like limitation of Spark. That is if dataflow
> is stable, all is ok. If there are peaks of loading more than possibility of
> computational system or data dependent time of calculation, Spark is not
> able to provide a periodically stable results output. Sometimes this is
> appropriate but sometime this is not appropriate.
>
> 2016-07-06 18:11 GMT+02:00 Cody Koeninger :
>>
>> Then double the upper limit you have set until the processing time
>> approaches the batch time.
>>
>> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
>> > Ok, with:
>> >
>> > .set("spark.streaming.backpressure.enabled","true")
>> > .set("spark.streaming.receiver.maxRate", "1")
>> > .set("spark.streaming.kafka.maxRatePerPartition", "1")
>> >
>> > I have something like
>> >
>> >
>> > ***
>> > Processing time: 5626
>> > Expected time: 1
>> > Processed messages: 10
>> > Message example: {"message": 950002,
>> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> > Recovered json:
>> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> >
>> > That is yes, it works but throughput is much less than without
>> > limitations
>> > because of this is an absolute upper limit. And time of processing is
>> > half
>> > of available.
>> >
>> > Regarding Spark 2.0 structured streaming I will look it some later. Now
>> > I
>> > don't know how to strictly measure throughput and latency of this high
>> > level
>> > API. My aim now is to compare streaming processors.
>> >
>> >
>> > 2016-07-06 17:41 GMT+02:00 Cody Koeninger :
>> >>
>> >> The configuration you set is spark.streaming.receiver.maxRate.  The
>> >> direct stream is not a receiver.  As I said in my first message in
>> >> this thread, and as the pages at
>> >>
>> >>
>> >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>> >> and
>> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> >> also say, use maxRatePerPartition for the direct stream.
>> >>
>> >> Bottom line, if you have more information than your system can process
>> >> in X amount of time, after X amount of time you can either give the
>> >> wrong answer, or take longer to process.  Flink can't violate the laws
>> >> of physics.  If the tradeoffs that Flink make are better for your use
>> >> case than the tradeoffs that DStreams make, you may be better off
>> >> using Flink (or testing out spark 2.0 structured streaming, although
>> >> there's no kafka integration available for that yet)
>> >>
>> >> On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
>> >> > ok, thanks. I tried  to set minimum max rate for beginning. However
>> >> > in
>> >> > general I don't know initial throughput. BTW it would be very useful
>> >> > to
>> >> > explain it in
>> >> >
>> >> >
>> >> > https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
>> >> >
>> >> > And really with
>> >> >
>> >> > .set("spark.streaming.backpressure.enabled","true")
>> >> > .set("spark.streaming.receiver.maxRate", "1")
>> >> >
>> >> > I have same problem:
>> >> >
>> >> >
>> >> > ***
>> >> > Processing time: 36994
>> >> > Expected time: 1
>> >> > Processed messages: 3015830
>> >> > Message example: {"message": 1,
>> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >> > Recovered json:
>> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >> >
>> >> >
>> >> > Regarding auto.offset.reset smallest, now it is because of a test and
>> >> > I
>> >> > want
>> >> > to get same messages for each run. But in any case I expect to read
>> >> > all
>> >> > new
>> >> > messages from queue.
>> >> >
>> >> > Regarding backpressure 

Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread radha
Hi ,
Thanks all, its working fine the issue is with some space for the dept id,

I have one more doubt for the non matching records its showing null word,
even if i write into HDFS also its showing null word how can we avoid
writing null for the non matching columns, i want just empty value ("")

same input i used in the dept table i removed the last row and the below
code i used to write into hdfs.

DataFrame joinResult = sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
DEPT d ON e.deptid = d.deptid");
   joinResult.javaRDD().repartition(1).map(new Function() {
private static final long serialVersionUID = 
9185646063977504742L;
@Override
public String call(Row arg0) throws Exception {  
String s;   
   
s=arg0.getString(0)+"\u001c"+arg0.getString(1)+"\u001c"+arg0.getString(2)+"\u001c"+arg0.getString(3)+"\u001c"+arg0.getString(4)+"\u001e";
return s;
}
}).saveAsTextFile(args[2]);


Output in HDFS File

10 1001 aba 10 dev
10 1003 abd 10 dev
10 1005 abg 10 dev
10 1007 abj 10 dev
10 1010 abq 10 dev
20 1002 abs 20 Test
20 1006 abh 20 Test
20 1009 abl 20 Test
30 1004 abf null null
30 1008 abk null null

in my case i want to store the join result back to data base table and its
storing "null" word for those non matching records, i want to store as
""(empty value) for the non matching rows.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295p27299.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread Radha krishna
Hi ,
Thanks all, its working fine the issue is with some space for the dept id,

I have one more doubt for the non matching records its showing null word,
even if i write into HDFS also its showing null word how can we avoid
writing null for the non matching columns, i want just empty value ("")

same input i used in the dept table i removed the last row and the below
code i used to write into hdfs.

DataFrame joinResult = sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
DEPT d ON e.deptid = d.deptid");
  joinResult.javaRDD().repartition(1).map(new Function() {
private static final long serialVersionUID = 9185646063977504742L;
@Override
public String call(Row arg0) throws Exception {
String s;

 
s=arg0.getString(0)+"\u001c"+arg0.getString(1)+"\u001c"+arg0.getString(2)+"\u001c"+arg0.getString(3)+"\u001c"+arg0.getString(4)+"\u001e";
return s;
}
}).saveAsTextFile(args[2]);


Output in HDFS File

10 1001 aba 10 dev
10 1003 abd 10 dev
10 1005 abg 10 dev
10 1007 abj 10 dev
10 1010 abq 10 dev
20 1002 abs 20 Test
20 1006 abh 20 Test
20 1009 abl 20 Test
30 1004 abf null null
30 1008 abk null null

in my case i want to store the join result back to data base table and its
storing "null" word for those non matching records, i want to store as
""(empty value) for the non matching rows.










On Wed, Jul 6, 2016 at 10:01 PM, Mich Talebzadeh 
wrote:

> Hive query is
>
> hive> SELECT *  FROM emp e LEFT OUTER JOIN dept d ON e.deptid = d.deptid;
> Status: Finished successfully in 2.02 seconds
> OK
> 1001aba 10  10  DEV
> 1002abs 20  20  TEST
> 1003abd 10  10  DEV
> 1001aba 10  10  DEV
> 1002abs 20  20  TEST
> 1003abd 10  10  DEV
> 1004abf 30  30  IT
> 1005abg 10  10  DEV
> 1004abf 30  30  IT
> 1005abg 10  10  DEV
> 1006abh 20  20  TEST
> 1007abj 10  10  DEV
> 1006abh 20  20  TEST
> 1007abj 10  10  DEV
> 1008abk 30  30  IT
> 1009abl 20  20  TEST
> 1010abq 10  10  DEV
> 1008abk 30  30  IT
> 1009abl 20  20  TEST
> 1010abq 10  10  DEV
> Time taken: 44.608 seconds, Fetched: 20 row(s)
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 July 2016 at 17:28, Mich Talebzadeh 
> wrote:
>
>> This is very simple
>>
>> in Hive
>>
>> Status: Running (Hive on Spark job[1])
>> Job Progress Format
>> CurrentTime StageId_StageAttemptId:
>> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
>> [StageCost]
>> 2016-07-06 17:17:16,006 Stage-1_0: 0(+1)/1
>> 2016-07-06 17:17:17,011 Stage-1_0: 1/1 Finished
>> Status: Finished successfully in 2.02 seconds
>> OK
>> 1001aba 10  10  DEV
>> 1002abs 20  20  TEST
>> 1003abd 10  10  DEV
>> 1001aba 10  10  DEV
>> 1002abs 20  20  TEST
>> 1003abd 10  10  DEV
>> 1004abf 30  30  IT
>> 1005abg 10  10  DEV
>> 1004abf 30  30  IT
>> 1005abg 10  10  DEV
>> 1006abh 20  20  TEST
>> 1007abj 10  10  DEV
>> 1006abh 20  20  TEST
>> 1007abj 10  10  DEV
>> 1008abk 30  30  IT
>> 1009abl 20  20  TEST
>> 1010abq 10  10  DEV
>> 1008abk 30  30  IT
>> 1009abl 20  20  TEST
>> 1010abq 10  10  DEV
>>
>>
>> In Spark
>>
>> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> HiveContext: org.apache.spark.sql.hive.HiveContext =
>> org.apache.spark.sql.hive.HiveContext@f9402c2
>>
>> scala> HiveContext.sql("use test")
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>
>> scala> val e = HiveContext.table("emp")
>> e: org.apache.spark.sql.DataFrame = [emp_id: int, name: string, deptid:
>> int]
>> scala> val d = HiveContext.table("dept")
>>
>> d: org.apache.spark.sql.DataFrame = [deptid: int, dept_name: string]
>> scala> val rs = e.join(d,e("deptid")===d("deptid"),
>> "fullouter").collect.foreach(println)
>> [1001,aba,10,10,DEV]
>> [1003,abd,10,10,DEV]
>> [1001,aba,10,10,DEV]
>> [1003,abd,10,10,DEV]
>> 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
Ok, thanks. But really this is not full decision. In case of growing of
processing time I will have growing of window time. That is really with
Spark I have 2 points of a latency growing. First is a delay of processing
of messages in Kafka queue due to physical limitation of a computer system.
And second one is growing of a processing window time of Spark. In case of
Flink there is only first point of delay but time intervals of output data
are fixed. It is really looks like limitation of Spark. That is if dataflow
is stable, all is ok. If there are peaks of loading more than possibility
of computational system or *data dependent time of calculation*, Spark is
not able to provide a periodically stable results output. Sometimes this is
appropriate but sometime this is not appropriate.

2016-07-06 18:11 GMT+02:00 Cody Koeninger :

> Then double the upper limit you have set until the processing time
> approaches the batch time.
>
> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
> > Ok, with:
> >
> > .set("spark.streaming.backpressure.enabled","true")
> > .set("spark.streaming.receiver.maxRate", "1")
> > .set("spark.streaming.kafka.maxRatePerPartition", "1")
> >
> > I have something like
> >
> >
> ***
> > Processing time: 5626
> > Expected time: 1
> > Processed messages: 10
> > Message example: {"message": 950002,
> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> > Recovered json:
> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> >
> > That is yes, it works but throughput is much less than without
> limitations
> > because of this is an absolute upper limit. And time of processing is
> half
> > of available.
> >
> > Regarding Spark 2.0 structured streaming I will look it some later. Now I
> > don't know how to strictly measure throughput and latency of this high
> level
> > API. My aim now is to compare streaming processors.
> >
> >
> > 2016-07-06 17:41 GMT+02:00 Cody Koeninger :
> >>
> >> The configuration you set is spark.streaming.receiver.maxRate.  The
> >> direct stream is not a receiver.  As I said in my first message in
> >> this thread, and as the pages at
> >>
> >>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
> >> and
> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> >> also say, use maxRatePerPartition for the direct stream.
> >>
> >> Bottom line, if you have more information than your system can process
> >> in X amount of time, after X amount of time you can either give the
> >> wrong answer, or take longer to process.  Flink can't violate the laws
> >> of physics.  If the tradeoffs that Flink make are better for your use
> >> case than the tradeoffs that DStreams make, you may be better off
> >> using Flink (or testing out spark 2.0 structured streaming, although
> >> there's no kafka integration available for that yet)
> >>
> >> On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
> >> > ok, thanks. I tried  to set minimum max rate for beginning. However in
> >> > general I don't know initial throughput. BTW it would be very useful
> to
> >> > explain it in
> >> >
> >> >
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
> >> >
> >> > And really with
> >> >
> >> > .set("spark.streaming.backpressure.enabled","true")
> >> > .set("spark.streaming.receiver.maxRate", "1")
> >> >
> >> > I have same problem:
> >> >
> >> >
> ***
> >> > Processing time: 36994
> >> > Expected time: 1
> >> > Processed messages: 3015830
> >> > Message example: {"message": 1,
> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >> > Recovered json:
> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >> >
> >> >
> >> > Regarding auto.offset.reset smallest, now it is because of a test and
> I
> >> > want
> >> > to get same messages for each run. But in any case I expect to read
> all
> >> > new
> >> > messages from queue.
> >> >
> >> > Regarding backpressure detection. What is to do then a process time is
> >> > much
> >> > more then input rate? Now I see growing time of processing instead of
> >> > stable
> >> > 10 second and decreasing number of processed messages. Where is a
> limit
> >> > of
> >> > of backpressure algorithm?
> >> >
> >> > Regarding Flink. I don't know how works core of Flink but you can
> check
> >> > self
> >> > that Flink will strictly terminate processing of messages by time.
> >> > Deviation
> >> > of the time window from 10 seconds to several minutes is impossible.
> >> >
> >> > PS: I prepared this example to make possible easy observe the problem
> >> > and
> >> > fix it if it is a bug. For me it is obvious. May I ask you to be near
> to
> >> > this simple source code? In other case I have 

Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread Mich Talebzadeh
Hive query is

hive> SELECT *  FROM emp e LEFT OUTER JOIN dept d ON e.deptid = d.deptid;
Status: Finished successfully in 2.02 seconds
OK
1001aba 10  10  DEV
1002abs 20  20  TEST
1003abd 10  10  DEV
1001aba 10  10  DEV
1002abs 20  20  TEST
1003abd 10  10  DEV
1004abf 30  30  IT
1005abg 10  10  DEV
1004abf 30  30  IT
1005abg 10  10  DEV
1006abh 20  20  TEST
1007abj 10  10  DEV
1006abh 20  20  TEST
1007abj 10  10  DEV
1008abk 30  30  IT
1009abl 20  20  TEST
1010abq 10  10  DEV
1008abk 30  30  IT
1009abl 20  20  TEST
1010abq 10  10  DEV
Time taken: 44.608 seconds, Fetched: 20 row(s)




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 17:28, Mich Talebzadeh  wrote:

> This is very simple
>
> in Hive
>
> Status: Running (Hive on Spark job[1])
> Job Progress Format
> CurrentTime StageId_StageAttemptId:
> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
> [StageCost]
> 2016-07-06 17:17:16,006 Stage-1_0: 0(+1)/1
> 2016-07-06 17:17:17,011 Stage-1_0: 1/1 Finished
> Status: Finished successfully in 2.02 seconds
> OK
> 1001aba 10  10  DEV
> 1002abs 20  20  TEST
> 1003abd 10  10  DEV
> 1001aba 10  10  DEV
> 1002abs 20  20  TEST
> 1003abd 10  10  DEV
> 1004abf 30  30  IT
> 1005abg 10  10  DEV
> 1004abf 30  30  IT
> 1005abg 10  10  DEV
> 1006abh 20  20  TEST
> 1007abj 10  10  DEV
> 1006abh 20  20  TEST
> 1007abj 10  10  DEV
> 1008abk 30  30  IT
> 1009abl 20  20  TEST
> 1010abq 10  10  DEV
> 1008abk 30  30  IT
> 1009abl 20  20  TEST
> 1010abq 10  10  DEV
>
>
> In Spark
>
> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> HiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@f9402c2
>
> scala> HiveContext.sql("use test")
> res1: org.apache.spark.sql.DataFrame = [result: string]
>
> scala> val e = HiveContext.table("emp")
> e: org.apache.spark.sql.DataFrame = [emp_id: int, name: string, deptid:
> int]
> scala> val d = HiveContext.table("dept")
>
> d: org.apache.spark.sql.DataFrame = [deptid: int, dept_name: string]
> scala> val rs = e.join(d,e("deptid")===d("deptid"),
> "fullouter").collect.foreach(println)
> [1001,aba,10,10,DEV]
> [1003,abd,10,10,DEV]
> [1001,aba,10,10,DEV]
> [1003,abd,10,10,DEV]
> [1005,abg,10,10,DEV]
> [1005,abg,10,10,DEV]
> [1007,abj,10,10,DEV]
> [1007,abj,10,10,DEV]
> [1010,abq,10,10,DEV]
> [1010,abq,10,10,DEV]
> [1002,abs,20,20,TEST]
> [1002,abs,20,20,TEST]
> [1006,abh,20,20,TEST]
> [1006,abh,20,20,TEST]
> [1009,abl,20,20,TEST]
> [1009,abl,20,20,TEST]
> [1004,abf,30,30,IT]
> [1004,abf,30,30,IT]
> [1008,abk,30,30,IT]
> [1008,abk,30,30,IT]
>
>
>
> Note that you need to enforce ordering
>
> HTH
>
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 July 2016 at 14:00, ayan guha  wrote:
>
>> looks like a data issue to me. Either EMP or DEPT has spaces in dept id
>> for deptid=20,30.
>>
>> Did you check in hive cli?
>>
>> On Wed, Jul 6, 2016 at 10:33 PM, radha  wrote:
>>
>>> Hi All,
>>>
>>> Please check below for the code and input and output, i think the output
>>> is
>>> not correct, i  am missing any thing? pls guide
>>>
>>> Code
>>>
>>> public class Test {
>>> private static JavaSparkContext jsc = null;
>>> 

Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread Mich Talebzadeh
This is very simple

in Hive

Status: Running (Hive on Spark job[1])
Job Progress Format
CurrentTime StageId_StageAttemptId:
SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
[StageCost]
2016-07-06 17:17:16,006 Stage-1_0: 0(+1)/1
2016-07-06 17:17:17,011 Stage-1_0: 1/1 Finished
Status: Finished successfully in 2.02 seconds
OK
1001aba 10  10  DEV
1002abs 20  20  TEST
1003abd 10  10  DEV
1001aba 10  10  DEV
1002abs 20  20  TEST
1003abd 10  10  DEV
1004abf 30  30  IT
1005abg 10  10  DEV
1004abf 30  30  IT
1005abg 10  10  DEV
1006abh 20  20  TEST
1007abj 10  10  DEV
1006abh 20  20  TEST
1007abj 10  10  DEV
1008abk 30  30  IT
1009abl 20  20  TEST
1010abq 10  10  DEV
1008abk 30  30  IT
1009abl 20  20  TEST
1010abq 10  10  DEV


In Spark

scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
HiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@f9402c2

scala> HiveContext.sql("use test")
res1: org.apache.spark.sql.DataFrame = [result: string]

scala> val e = HiveContext.table("emp")
e: org.apache.spark.sql.DataFrame = [emp_id: int, name: string, deptid: int]
scala> val d = HiveContext.table("dept")

d: org.apache.spark.sql.DataFrame = [deptid: int, dept_name: string]
scala> val rs = e.join(d,e("deptid")===d("deptid"),
"fullouter").collect.foreach(println)
[1001,aba,10,10,DEV]
[1003,abd,10,10,DEV]
[1001,aba,10,10,DEV]
[1003,abd,10,10,DEV]
[1005,abg,10,10,DEV]
[1005,abg,10,10,DEV]
[1007,abj,10,10,DEV]
[1007,abj,10,10,DEV]
[1010,abq,10,10,DEV]
[1010,abq,10,10,DEV]
[1002,abs,20,20,TEST]
[1002,abs,20,20,TEST]
[1006,abh,20,20,TEST]
[1006,abh,20,20,TEST]
[1009,abl,20,20,TEST]
[1009,abl,20,20,TEST]
[1004,abf,30,30,IT]
[1004,abf,30,30,IT]
[1008,abk,30,30,IT]
[1008,abk,30,30,IT]



Note that you need to enforce ordering

HTH











Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 14:00, ayan guha  wrote:

> looks like a data issue to me. Either EMP or DEPT has spaces in dept id
> for deptid=20,30.
>
> Did you check in hive cli?
>
> On Wed, Jul 6, 2016 at 10:33 PM, radha  wrote:
>
>> Hi All,
>>
>> Please check below for the code and input and output, i think the output
>> is
>> not correct, i  am missing any thing? pls guide
>>
>> Code
>>
>> public class Test {
>> private static JavaSparkContext jsc = null;
>> private static SQLContext sqlContext = null;
>> private static Configuration hadoopConf = null;
>> public static void main(String[] args) {
>>
>> jsc = GlobalSparkContext.getJavaSparkContext();
>> sqlContext = GlobalSparkContext.getSQLContext(jsc);
>>
>> hadoopConf = new Configuration(jsc.hadoopConfiguration());
>>
>>
>> hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
>>
>> try {
>> final Emp emp = new Emp();
>> final Dept dept = new Dept();
>>
>> JavaPairRDD deptinputLines =
>> jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
>> Text.class, hadoopConf);
>> JavaRDD deptRDD = deptinputLines.map(new
>> Function, String>() {
>> @Override
>> public String
>> call(Tuple2 arg0) throws Exception {
>> return
>> arg0._2.toString();
>> }
>>
>> }).map(new Function> Dept>() {
>>
>> public Dept call(String recordLine)
>> throws Exception {
>> String[] parts =
>> recordLine.split(GlobalSparkContext.recordSeparator);
>> return getInstanceDept(parts,
>> dept);
>> }
>> });
>>
>> DataFrame deptDF =
>> 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
Then double the upper limit you have set until the processing time
approaches the batch time.

On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
> Ok, with:
>
> .set("spark.streaming.backpressure.enabled","true")
> .set("spark.streaming.receiver.maxRate", "1")
> .set("spark.streaming.kafka.maxRatePerPartition", "1")
>
> I have something like
>
> ***
> Processing time: 5626
> Expected time: 1
> Processed messages: 10
> Message example: {"message": 950002,
> "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> Recovered json:
> {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>
> That is yes, it works but throughput is much less than without limitations
> because of this is an absolute upper limit. And time of processing is half
> of available.
>
> Regarding Spark 2.0 structured streaming I will look it some later. Now I
> don't know how to strictly measure throughput and latency of this high level
> API. My aim now is to compare streaming processors.
>
>
> 2016-07-06 17:41 GMT+02:00 Cody Koeninger :
>>
>> The configuration you set is spark.streaming.receiver.maxRate.  The
>> direct stream is not a receiver.  As I said in my first message in
>> this thread, and as the pages at
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>> and http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> also say, use maxRatePerPartition for the direct stream.
>>
>> Bottom line, if you have more information than your system can process
>> in X amount of time, after X amount of time you can either give the
>> wrong answer, or take longer to process.  Flink can't violate the laws
>> of physics.  If the tradeoffs that Flink make are better for your use
>> case than the tradeoffs that DStreams make, you may be better off
>> using Flink (or testing out spark 2.0 structured streaming, although
>> there's no kafka integration available for that yet)
>>
>> On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
>> > ok, thanks. I tried  to set minimum max rate for beginning. However in
>> > general I don't know initial throughput. BTW it would be very useful to
>> > explain it in
>> >
>> > https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
>> >
>> > And really with
>> >
>> > .set("spark.streaming.backpressure.enabled","true")
>> > .set("spark.streaming.receiver.maxRate", "1")
>> >
>> > I have same problem:
>> >
>> > ***
>> > Processing time: 36994
>> > Expected time: 1
>> > Processed messages: 3015830
>> > Message example: {"message": 1,
>> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> > Recovered json:
>> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >
>> >
>> > Regarding auto.offset.reset smallest, now it is because of a test and I
>> > want
>> > to get same messages for each run. But in any case I expect to read all
>> > new
>> > messages from queue.
>> >
>> > Regarding backpressure detection. What is to do then a process time is
>> > much
>> > more then input rate? Now I see growing time of processing instead of
>> > stable
>> > 10 second and decreasing number of processed messages. Where is a limit
>> > of
>> > of backpressure algorithm?
>> >
>> > Regarding Flink. I don't know how works core of Flink but you can check
>> > self
>> > that Flink will strictly terminate processing of messages by time.
>> > Deviation
>> > of the time window from 10 seconds to several minutes is impossible.
>> >
>> > PS: I prepared this example to make possible easy observe the problem
>> > and
>> > fix it if it is a bug. For me it is obvious. May I ask you to be near to
>> > this simple source code? In other case I have to think that this is a
>> > technical limitation of Spark to work with unstable data flows.
>> >
>> > Cheers
>> >
>> > 2016-07-06 16:40 GMT+02:00 Cody Koeninger :
>> >>
>> >> The direct stream determines batch sizes on the driver, in advance of
>> >> processing.  If you haven't specified a maximum batch size, how would
>> >> you suggest the backpressure code determine how to limit the first
>> >> batch?  It has no data on throughput until at least one batch is
>> >> completed.
>> >>
>> >> Again, this is why I'm saying test by producing messages into kafka at
>> >> a rate comparable to production, not loading a ton of messages in and
>> >> starting from auto.offset.reset smallest.
>> >>
>> >> Even if you're unwilling to take that advice for some reason, and
>> >> unwilling to empirically determine a reasonable maximum partition
>> >> size, you should be able to estimate an upper bound such that the
>> >> first batch does not encompass your entire kafka retention.
>> >> Backpressure will kick in once it has some information to work with.
>> >>
>> 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
Ok, with:

.set("spark.streaming.backpressure.enabled","true")
.set("spark.streaming.receiver.maxRate", "1")
.set("spark.streaming.kafka.maxRatePerPartition", "1")

I have something like

***
Processing time: 5626
Expected time: 1
Processed messages: 10
Message example: {"message": 950002,
"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
Recovered json:
{"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}

That is yes, it works but throughput is much less than without limitations
because of this is an absolute upper limit. And time of processing is half
of available.

Regarding Spark 2.0 structured streaming I will look it some later. Now I
don't know how to strictly measure throughput and latency of this high
level API. My aim now is to compare streaming processors.


2016-07-06 17:41 GMT+02:00 Cody Koeninger :

> The configuration you set is spark.streaming.receiver.maxRate.  The
> direct stream is not a receiver.  As I said in my first message in
> this thread, and as the pages at
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
> and http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> also say, use maxRatePerPartition for the direct stream.
>
> Bottom line, if you have more information than your system can process
> in X amount of time, after X amount of time you can either give the
> wrong answer, or take longer to process.  Flink can't violate the laws
> of physics.  If the tradeoffs that Flink make are better for your use
> case than the tradeoffs that DStreams make, you may be better off
> using Flink (or testing out spark 2.0 structured streaming, although
> there's no kafka integration available for that yet)
>
> On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
> > ok, thanks. I tried  to set minimum max rate for beginning. However in
> > general I don't know initial throughput. BTW it would be very useful to
> > explain it in
> >
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
> >
> > And really with
> >
> > .set("spark.streaming.backpressure.enabled","true")
> > .set("spark.streaming.receiver.maxRate", "1")
> >
> > I have same problem:
> >
> ***
> > Processing time: 36994
> > Expected time: 1
> > Processed messages: 3015830
> > Message example: {"message": 1,
> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> > Recovered json:
> {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >
> >
> > Regarding auto.offset.reset smallest, now it is because of a test and I
> want
> > to get same messages for each run. But in any case I expect to read all
> new
> > messages from queue.
> >
> > Regarding backpressure detection. What is to do then a process time is
> much
> > more then input rate? Now I see growing time of processing instead of
> stable
> > 10 second and decreasing number of processed messages. Where is a limit
> of
> > of backpressure algorithm?
> >
> > Regarding Flink. I don't know how works core of Flink but you can check
> self
> > that Flink will strictly terminate processing of messages by time.
> Deviation
> > of the time window from 10 seconds to several minutes is impossible.
> >
> > PS: I prepared this example to make possible easy observe the problem and
> > fix it if it is a bug. For me it is obvious. May I ask you to be near to
> > this simple source code? In other case I have to think that this is a
> > technical limitation of Spark to work with unstable data flows.
> >
> > Cheers
> >
> > 2016-07-06 16:40 GMT+02:00 Cody Koeninger :
> >>
> >> The direct stream determines batch sizes on the driver, in advance of
> >> processing.  If you haven't specified a maximum batch size, how would
> >> you suggest the backpressure code determine how to limit the first
> >> batch?  It has no data on throughput until at least one batch is
> >> completed.
> >>
> >> Again, this is why I'm saying test by producing messages into kafka at
> >> a rate comparable to production, not loading a ton of messages in and
> >> starting from auto.offset.reset smallest.
> >>
> >> Even if you're unwilling to take that advice for some reason, and
> >> unwilling to empirically determine a reasonable maximum partition
> >> size, you should be able to estimate an upper bound such that the
> >> first batch does not encompass your entire kafka retention.
> >> Backpressure will kick in once it has some information to work with.
> >>
> >> On Wed, Jul 6, 2016 at 8:45 AM, rss rss  wrote:
> >> > Hello,
> >> >
> >> >   thanks, I tried to
> .set("spark.streaming.backpressure.enabled","true")
> >> > but
> >> > result is negative. Therefore I have prepared small test
> >> > https://github.com/rssdev10/spark-kafka-streaming
> >> >
> >> >   

Re: SnappyData and Structured Streaming

2016-07-06 Thread Benjamin Kim
Jags,

Thanks for the details. This makes things much clearer. I saw in the Spark 
roadmap that version 2.1 will add the SQL capabilities mentioned here. It looks 
like, gradually, the Spark community is coming to the same conclusions that the 
SnappyData folks have come to a while back in terms of Streaming. But, there is 
always the need for a better way to store data underlying Spark. The State 
Store information was informative too. I can envision that it can use this data 
store too if need be.

Thanks again,
Ben

> On Jul 6, 2016, at 8:52 AM, Jags Ramnarayan  wrote:
> 
> The plan is to fully integrate with the new structured streaming API and 
> implementation in an upcoming release. But, we will continue offering several 
> extensions. Few noted below ...
> 
> - the store (streaming sink) will offer a lot more capabilities like 
> transactions, replicated tables, partitioned row and column oriented tables 
> to suit different types of workloads. 
> - While streaming API(scala) in snappydata itself will change a bit to become 
> fully compatible with structured streaming(SchemaDStream will go away), we 
> will continue to offer SQL support for streams so they can be managed from 
> external clients (JDBC, ODBC), their partitions can share the same 
> partitioning strategy as the underlying table where it might be stored, and 
> even registrations of continuous queries from remote clients. 
> 
> While building streaming apps using the Spark APi offers tremendous 
> flexibility we also want to make it simple for apps to work with streams just 
> using SQL. For instance, you should be able to declaratively specify a table 
> as a sink to a stream(i.e. using SQL). For example, you can specify a "TopK 
> Table" (a built in special table for topK analytics using probabilistic data 
> structures) as a sink for a high velocity time series stream like this - 
> "create topK table MostPopularTweets on tweetStreamTable " +
> "options(key 'hashtag', frequencyCol 'retweets', timeSeriesColumn 
> 'tweetTime' )" 
> where 'tweetStreamTable' is created using the 'create stream table ...' SQL 
> syntax. 
> 
> 
> -
> Jags
> SnappyData blog 
> Download binary, source 
> 
> 
> On Wed, Jul 6, 2016 at 8:02 PM, Benjamin Kim  > wrote:
> Jags,
> 
> I should have been more specific. I am referring to what I read at 
> http://snappydatainc.github.io/snappydata/streamingWithSQL/ 
> , especially the 
> Streaming Tables part. It roughly coincides with the Streaming DataFrames 
> outlined here 
> https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h
>  
> .
>  I don’t if I’m wrong, but they both sound very similar. That’s why I posed 
> this question.
> 
> Thanks,
> Ben
> 
>> On Jul 6, 2016, at 7:03 AM, Jags Ramnarayan > > wrote:
>> 
>> Ben,
>>Note that Snappydata's primary objective is to be a distributed in-memory 
>> DB for mixed workloads (i.e. streaming with transactions and analytic 
>> queries). On the other hand, Spark, till date, is primarily designed as a 
>> processing engine over myriad storage engines (SnappyData being one). So, 
>> the marriage is quite complementary. The difference compared to other stores 
>> is that SnappyData realizes its solution by deeply integrating and 
>> collocating with Spark (i.e. share spark executor memory/resources with the 
>> store) avoiding serializations and shuffle in many situations.
>> 
>> On your specific thought about being similar to Structured streaming, a 
>> better discussion could be a comparison to the recently introduced State 
>> store 
>> 
>>  (perhaps this is what you meant). 
>> It proposes a KV store for streaming aggregations with support for updates. 
>> The proposed API will, at some point, be pluggable so vendors can easily 
>> support alternate implementations to storage, not just HDFS(default store in 
>> proposed State store). 
>> 
>> 
>> -
>> Jags
>> SnappyData blog 
>> Download binary, source 
>> 
>> 
>> On Wed, Jul 6, 2016 at 12:49 AM, Benjamin Kim > > wrote:
>> I recently got a sales email from SnappyData, and after reading the 
>> documentation about what they offer, it sounds very similar to what 
>> Structured Streaming will offer w/o the underlying in-memory, spill-to-disk, 
>> CRUD compliant data storage in SnappyData. I was wondering if Structured 
>> Streaming is 

Re: SnappyData and Structured Streaming

2016-07-06 Thread Jags Ramnarayan
The plan is to fully integrate with the new structured streaming API and
implementation in an upcoming release. But, we will continue offering
several extensions. Few noted below ...

- the store (streaming sink) will offer a lot more capabilities like
transactions, replicated tables, partitioned row and column oriented tables
to suit different types of workloads.
- While streaming API(scala) in snappydata itself will change a bit to
become fully compatible with structured streaming(SchemaDStream will go
away), we will continue to offer SQL support for streams so they can be
managed from external clients (JDBC, ODBC), their partitions can share the
same partitioning strategy as the underlying table where it might be
stored, and even registrations of continuous queries from remote clients.

While building streaming apps using the Spark APi offers tremendous
flexibility we also want to make it simple for apps to work with streams
just using SQL. For instance, you should be able to declaratively specify a
table as a sink to a stream(i.e. using SQL). For example, you can specify a
"TopK Table" (a built in special table for topK analytics using
probabilistic data structures) as a sink for a high velocity time series
stream like this - "create topK table MostPopularTweets on tweetStreamTable "
+
"options(key 'hashtag', frequencyCol 'retweets', timeSeriesColumn
'tweetTime' )"
where 'tweetStreamTable' is created using the 'create stream table ...' SQL
syntax.


-
Jags
SnappyData blog 
Download binary, source 


On Wed, Jul 6, 2016 at 8:02 PM, Benjamin Kim  wrote:

> Jags,
>
> I should have been more specific. I am referring to what I read at
> http://snappydatainc.github.io/snappydata/streamingWithSQL/, especially
> the Streaming Tables part. It roughly coincides with the Streaming
> DataFrames outlined here
> https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h.
> I don’t if I’m wrong, but they both sound very similar. That’s why I posed
> this question.
>
> Thanks,
> Ben
>
> On Jul 6, 2016, at 7:03 AM, Jags Ramnarayan 
> wrote:
>
> Ben,
>Note that Snappydata's primary objective is to be a distributed
> in-memory DB for mixed workloads (i.e. streaming with transactions and
> analytic queries). On the other hand, Spark, till date, is primarily
> designed as a processing engine over myriad storage engines (SnappyData
> being one). So, the marriage is quite complementary. The difference
> compared to other stores is that SnappyData realizes its solution by deeply
> integrating and collocating with Spark (i.e. share spark executor
> memory/resources with the store) avoiding serializations and shuffle in
> many situations.
>
> On your specific thought about being similar to Structured streaming, a
> better discussion could be a comparison to the recently introduced State
> store
> 
>  (perhaps
> this is what you meant).
> It proposes a KV store for streaming aggregations with support for
> updates. The proposed API will, at some point, be pluggable so vendors can
> easily support alternate implementations to storage, not just HDFS(default
> store in proposed State store).
>
>
> -
> Jags
> SnappyData blog 
> Download binary, source 
>
>
> On Wed, Jul 6, 2016 at 12:49 AM, Benjamin Kim  wrote:
>
>> I recently got a sales email from SnappyData, and after reading the
>> documentation about what they offer, it sounds very similar to what
>> Structured Streaming will offer w/o the underlying in-memory,
>> spill-to-disk, CRUD compliant data storage in SnappyData. I was wondering
>> if Structured Streaming is trying to achieve the same on its own or is
>> SnappyData contributing Streaming extensions that they built to the Spark
>> project. Lastly, what does the Spark community think of this so-called
>> “Spark Data Store”?
>>
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
The configuration you set is spark.streaming.receiver.maxRate.  The
direct stream is not a receiver.  As I said in my first message in
this thread, and as the pages at
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
and http://spark.apache.org/docs/latest/configuration.html#spark-streaming
also say, use maxRatePerPartition for the direct stream.

Bottom line, if you have more information than your system can process
in X amount of time, after X amount of time you can either give the
wrong answer, or take longer to process.  Flink can't violate the laws
of physics.  If the tradeoffs that Flink make are better for your use
case than the tradeoffs that DStreams make, you may be better off
using Flink (or testing out spark 2.0 structured streaming, although
there's no kafka integration available for that yet)

On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
> ok, thanks. I tried  to set minimum max rate for beginning. However in
> general I don't know initial throughput. BTW it would be very useful to
> explain it in
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
>
> And really with
>
> .set("spark.streaming.backpressure.enabled","true")
> .set("spark.streaming.receiver.maxRate", "1")
>
> I have same problem:
> ***
> Processing time: 36994
> Expected time: 1
> Processed messages: 3015830
> Message example: {"message": 1,
> "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>
>
> Regarding auto.offset.reset smallest, now it is because of a test and I want
> to get same messages for each run. But in any case I expect to read all new
> messages from queue.
>
> Regarding backpressure detection. What is to do then a process time is much
> more then input rate? Now I see growing time of processing instead of stable
> 10 second and decreasing number of processed messages. Where is a limit of
> of backpressure algorithm?
>
> Regarding Flink. I don't know how works core of Flink but you can check self
> that Flink will strictly terminate processing of messages by time. Deviation
> of the time window from 10 seconds to several minutes is impossible.
>
> PS: I prepared this example to make possible easy observe the problem and
> fix it if it is a bug. For me it is obvious. May I ask you to be near to
> this simple source code? In other case I have to think that this is a
> technical limitation of Spark to work with unstable data flows.
>
> Cheers
>
> 2016-07-06 16:40 GMT+02:00 Cody Koeninger :
>>
>> The direct stream determines batch sizes on the driver, in advance of
>> processing.  If you haven't specified a maximum batch size, how would
>> you suggest the backpressure code determine how to limit the first
>> batch?  It has no data on throughput until at least one batch is
>> completed.
>>
>> Again, this is why I'm saying test by producing messages into kafka at
>> a rate comparable to production, not loading a ton of messages in and
>> starting from auto.offset.reset smallest.
>>
>> Even if you're unwilling to take that advice for some reason, and
>> unwilling to empirically determine a reasonable maximum partition
>> size, you should be able to estimate an upper bound such that the
>> first batch does not encompass your entire kafka retention.
>> Backpressure will kick in once it has some information to work with.
>>
>> On Wed, Jul 6, 2016 at 8:45 AM, rss rss  wrote:
>> > Hello,
>> >
>> >   thanks, I tried to .set("spark.streaming.backpressure.enabled","true")
>> > but
>> > result is negative. Therefore I have prepared small test
>> > https://github.com/rssdev10/spark-kafka-streaming
>> >
>> >   How to run:
>> >   git clone https://github.com/rssdev10/spark-kafka-streaming.git
>> >   cd spark-kafka-streaming
>> >
>> >   # downloads kafka and zookeeper
>> >   ./gradlew setup
>> >
>> >   # run zookeeper, kafka, and run messages generation
>> >   ./gradlew test_data_prepare
>> >
>> > And in other console just run:
>> >./gradlew test_spark
>> >
>> > It is easy to break data generation by CTRL-C. And continue by same
>> > command
>> > ./gradlew test_data_prepare
>> >
>> > To stop all run:
>> >   ./gradlew stop_all
>> >
>> > Spark test must generate messages each 10 seconds like:
>> >
>> > ***
>> > Processing time: 33477
>> > Expected time: 1
>> > Processed messages: 2897866
>> > Message example: {"message": 1,
>> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> > Recovered json:
>> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >
>> >
>> > message is number of fist message in the window. Time values are in
>> > milliseconds.
>> >
>> > Brief results:
>> >
>> > Spark always reads all messaged from Kafka after 

Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread Rabin Banerjee
Checked in spark-shell with spark 1.5.0

scala> val emmpdat = sc.textFile("empfile");
emmpdat: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[23] at
textFile at :21

scala> case class EMP (id:Int , name : String , deptId: Int)
defined class EMP

scala> val empdf = emmpdat.map((f) => {val ff=f.split(" ");new
EMP(ff(0).toInt,ff(1),ff(2).toInt)}).toDF
empdf: org.apache.spark.sql.DataFrame = [id: int, name: string, deptId: int]

scala> empdf.registerTempTable("myemp");

scala>

scala> val deptdat = sc.textFile("deptfile");
deptdat: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[27] at
textFile at :21

scala> case class DEPT (deptId:Int , deptName : String )
defined class DEPT

scala>

scala> val deptdf = deptdat.map((f) => {val ff=f.split(" ");new
DEPT(ff(0).toInt,ff(1))}).toDF
deptdf: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string]

scala> deptdf.registerTempTable("mydept");

scala>

scala> sqlContext.sql("SELECT * FROM myemp e LEFT OUTER JOIN mydept d ON
e.deptid = d.deptid");
16/07/06 21:00:08 INFO parse.ParseDriver: Parsing command: SELECT * FROM
myemp e LEFT OUTER JOIN mydept d ON e.deptid = d.deptid
16/07/06 21:00:08 INFO parse.ParseDriver: Parse Completed
res9: org.apache.spark.sql.DataFrame = [id: int, name: string, deptId: int,
deptId: int, deptName: string]

scala> sqlContext.sql("SELECT * FROM myemp e LEFT OUTER JOIN mydept d ON
e.deptid = d.deptid").show
16/07/06 21:00:15 INFO parse.ParseDriver: Parsing command: SELECT * FROM
myemp e LEFT OUTER JOIN mydept d ON e.deptid = d.deptid
16/07/06 21:00:15 INFO parse.ParseDriver: Parse Completed
+++--+--++
|  id|name|deptId|deptId|deptName|
+++--+--++
|1001| aba|10|10| dev|
|1003| abd|10|10| dev|
|1005| abg|10|10| dev|
|1007| abj|10|10| dev|
|1010| abq|10|10| dev|
|1002| abs|20|20|Test|
|1006| abh|20|20|Test|
|1009| abl|20|20|Test|
|1004| abf|30|30|  IT|
|1008| abk|30|30|  IT|
+++--+--++


scala>



On Wed, Jul 6, 2016 at 6:30 PM, ayan guha  wrote:

> looks like a data issue to me. Either EMP or DEPT has spaces in dept id
> for deptid=20,30.
>
> Did you check in hive cli?
>
> On Wed, Jul 6, 2016 at 10:33 PM, radha  wrote:
>
>> Hi All,
>>
>> Please check below for the code and input and output, i think the output
>> is
>> not correct, i  am missing any thing? pls guide
>>
>> Code
>>
>> public class Test {
>> private static JavaSparkContext jsc = null;
>> private static SQLContext sqlContext = null;
>> private static Configuration hadoopConf = null;
>> public static void main(String[] args) {
>>
>> jsc = GlobalSparkContext.getJavaSparkContext();
>> sqlContext = GlobalSparkContext.getSQLContext(jsc);
>>
>> hadoopConf = new Configuration(jsc.hadoopConfiguration());
>>
>>
>> hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
>>
>> try {
>> final Emp emp = new Emp();
>> final Dept dept = new Dept();
>>
>> JavaPairRDD deptinputLines =
>> jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
>> Text.class, hadoopConf);
>> JavaRDD deptRDD = deptinputLines.map(new
>> Function, String>() {
>> @Override
>> public String
>> call(Tuple2 arg0) throws Exception {
>> return
>> arg0._2.toString();
>> }
>>
>> }).map(new Function> Dept>() {
>>
>> public Dept call(String recordLine)
>> throws Exception {
>> String[] parts =
>> recordLine.split(GlobalSparkContext.recordSeparator);
>> return getInstanceDept(parts,
>> dept);
>> }
>> });
>>
>> DataFrame deptDF =
>> sqlContext.createDataFrame(deptRDD, Dept.class);
>> deptDF.registerTempTable("DEPT");
>> //deptDF.show();
>>
>> JavaPairRDD inputLines =
>> jsc.newAPIHadoopFile(args[1], TextInputFormat.class, LongWritable.class,
>> Text.class, hadoopConf);
>> JavaRDD empRDD = inputLines.map(new
>> Function> Text>, String>() {
>>
>> private static final long
>> serialVersionUID = 3371707560417405016L;
>>
>> 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
ok, thanks. I tried  to set minimum max rate for beginning. However in
general I don't know initial throughput. BTW it would be very useful to
explain it in
https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

And really with

.set("spark.streaming.backpressure.enabled","true")
.set("spark.streaming.receiver.maxRate", "1")

I have same problem:
***
Processing time: *36994*
Expected time: 1
Processed messages: *3015830*
Message example: {"message": 1,
"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}


Regarding auto.offset.reset smallest, now it is because of a test and I
want to get same messages for each run. But in any case I expect to read
all new messages from queue.

Regarding backpressure detection. What is to do then a process time is much
more then input rate? Now I see growing time of processing instead of
stable 10 second and decreasing number of processed messages. Where is a
limit of of backpressure algorithm?

Regarding Flink. I don't know how works core of Flink but you can check
self that Flink will strictly terminate processing of messages by time.
Deviation of the time window from 10 seconds to several minutes is
impossible.

PS: I prepared this example to make possible easy observe the problem and
fix it if it is a bug. For me it is obvious. May I ask you to be near to
this simple source code? In other case I have to think that this is a
technical limitation of Spark to work with unstable data flows.

Cheers

2016-07-06 16:40 GMT+02:00 Cody Koeninger :

> The direct stream determines batch sizes on the driver, in advance of
> processing.  If you haven't specified a maximum batch size, how would
> you suggest the backpressure code determine how to limit the first
> batch?  It has no data on throughput until at least one batch is
> completed.
>
> Again, this is why I'm saying test by producing messages into kafka at
> a rate comparable to production, not loading a ton of messages in and
> starting from auto.offset.reset smallest.
>
> Even if you're unwilling to take that advice for some reason, and
> unwilling to empirically determine a reasonable maximum partition
> size, you should be able to estimate an upper bound such that the
> first batch does not encompass your entire kafka retention.
> Backpressure will kick in once it has some information to work with.
>
> On Wed, Jul 6, 2016 at 8:45 AM, rss rss  wrote:
> > Hello,
> >
> >   thanks, I tried to .set("spark.streaming.backpressure.enabled","true")
> but
> > result is negative. Therefore I have prepared small test
> > https://github.com/rssdev10/spark-kafka-streaming
> >
> >   How to run:
> >   git clone https://github.com/rssdev10/spark-kafka-streaming.git
> >   cd spark-kafka-streaming
> >
> >   # downloads kafka and zookeeper
> >   ./gradlew setup
> >
> >   # run zookeeper, kafka, and run messages generation
> >   ./gradlew test_data_prepare
> >
> > And in other console just run:
> >./gradlew test_spark
> >
> > It is easy to break data generation by CTRL-C. And continue by same
> command
> > ./gradlew test_data_prepare
> >
> > To stop all run:
> >   ./gradlew stop_all
> >
> > Spark test must generate messages each 10 seconds like:
> >
> ***
> > Processing time: 33477
> > Expected time: 1
> > Processed messages: 2897866
> > Message example: {"message": 1,
> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> > Recovered json:
> {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >
> >
> > message is number of fist message in the window. Time values are in
> > milliseconds.
> >
> > Brief results:
> >
> > Spark always reads all messaged from Kafka after first connection
> > independently on dstream or window size time. It looks like a bug.
> > When processing speed in Spark's app is near to speed of data generation
> all
> > is ok.
> > I added delayFactor in
> >
> https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
> > to emulate slow processing. And streaming process is in degradation. When
> > delayFactor=0 it looks like stable process.
> >
> >
> > Cheers
> >
> >
> > 2016-07-05 17:51 GMT+02:00 Cody Koeninger :
> >>
> >> Test by producing messages into kafka at a rate comparable to what you
> >> expect in production.
> >>
> >> Test with backpressure turned on, it doesn't require you to specify a
> >> fixed limit on number of messages and will do its best to maintain
> >> batch timing.  Or you could empirically determine a reasonable fixed
> >> limit.
> >>
> >> Setting up a kafka topic with way more static messages in it than your
> >> system can handle in one batch, and then starting a stream from the
> >> beginning of it without turning on backpressure 

Re: how to select first 50 value of each group after group by?

2016-07-06 Thread Anton Okolnychyi
The following resources should be useful:

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html

The last link should have the exact solution

2016-07-06 16:55 GMT+02:00 Tal Grynbaum :

> You can use rank window function to rank each row in the group,  and then
> filter the rowz with rank < 50
>
> On Wed, Jul 6, 2016, 14:07  wrote:
>
>> hi there
>> I have a DF with 3 columns: id , pv, location.(the rows are already
>> grouped by location and sort by pv in des)  I wanna get the first 50 id
>> values grouped by location. I checked the API of
>> dataframe,groupeddata,pairRDD, and found no match.
>>   is there a way to do this naturally?
>>   any info will be appreciated.
>>
>>
>>
>> 
>>
>> ThanksBest regards!
>> San.Luo
>>
>


Re: how to select first 50 value of each group after group by?

2016-07-06 Thread Tal Grynbaum
You can use rank window function to rank each row in the group,  and then
filter the rowz with rank < 50

On Wed, Jul 6, 2016, 14:07  wrote:

> hi there
> I have a DF with 3 columns: id , pv, location.(the rows are already
> grouped by location and sort by pv in des)  I wanna get the first 50 id
> values grouped by location. I checked the API of
> dataframe,groupeddata,pairRDD, and found no match.
>   is there a way to do this naturally?
>   any info will be appreciated.
>
>
>
> 
>
> ThanksBest regards!
> San.Luo
>


Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
The direct stream determines batch sizes on the driver, in advance of
processing.  If you haven't specified a maximum batch size, how would
you suggest the backpressure code determine how to limit the first
batch?  It has no data on throughput until at least one batch is
completed.

Again, this is why I'm saying test by producing messages into kafka at
a rate comparable to production, not loading a ton of messages in and
starting from auto.offset.reset smallest.

Even if you're unwilling to take that advice for some reason, and
unwilling to empirically determine a reasonable maximum partition
size, you should be able to estimate an upper bound such that the
first batch does not encompass your entire kafka retention.
Backpressure will kick in once it has some information to work with.

On Wed, Jul 6, 2016 at 8:45 AM, rss rss  wrote:
> Hello,
>
>   thanks, I tried to .set("spark.streaming.backpressure.enabled","true") but
> result is negative. Therefore I have prepared small test
> https://github.com/rssdev10/spark-kafka-streaming
>
>   How to run:
>   git clone https://github.com/rssdev10/spark-kafka-streaming.git
>   cd spark-kafka-streaming
>
>   # downloads kafka and zookeeper
>   ./gradlew setup
>
>   # run zookeeper, kafka, and run messages generation
>   ./gradlew test_data_prepare
>
> And in other console just run:
>./gradlew test_spark
>
> It is easy to break data generation by CTRL-C. And continue by same command
> ./gradlew test_data_prepare
>
> To stop all run:
>   ./gradlew stop_all
>
> Spark test must generate messages each 10 seconds like:
> ***
> Processing time: 33477
> Expected time: 1
> Processed messages: 2897866
> Message example: {"message": 1,
> "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>
>
> message is number of fist message in the window. Time values are in
> milliseconds.
>
> Brief results:
>
> Spark always reads all messaged from Kafka after first connection
> independently on dstream or window size time. It looks like a bug.
> When processing speed in Spark's app is near to speed of data generation all
> is ok.
> I added delayFactor in
> https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
> to emulate slow processing. And streaming process is in degradation. When
> delayFactor=0 it looks like stable process.
>
>
> Cheers
>
>
> 2016-07-05 17:51 GMT+02:00 Cody Koeninger :
>>
>> Test by producing messages into kafka at a rate comparable to what you
>> expect in production.
>>
>> Test with backpressure turned on, it doesn't require you to specify a
>> fixed limit on number of messages and will do its best to maintain
>> batch timing.  Or you could empirically determine a reasonable fixed
>> limit.
>>
>> Setting up a kafka topic with way more static messages in it than your
>> system can handle in one batch, and then starting a stream from the
>> beginning of it without turning on backpressure or limiting the number
>> of messages... isn't a reasonable way to test steady state
>> performance.  Flink can't magically give you a correct answer under
>> those circumstances either.
>>
>> On Tue, Jul 5, 2016 at 10:41 AM, rss rss  wrote:
>> > Hi, thanks.
>> >
>> >I know about possibility to limit number of messages. But the problem
>> > is
>> > I don't know number of messages which the system able to process. It
>> > depends
>> > on data. The example is very simple. I need a strict response after
>> > specified time. Something like soft real time. In case of Flink I able
>> > to
>> > setup strict time of processing like this:
>> >
>> > KeyedStream keyed =
>> > eventStream.keyBy(event.userId.getBytes()[0] % partNum);
>> > WindowedStream uniqUsersWin =
>> > keyed.timeWindow(
>> > Time.seconds(10) );
>> > DataStream uniqUsers =
>> > uniq.trigger(ProcessingTimeTrigger.create())
>> > .fold(new Aggregator(), new FoldFunction() {
>> > @Override
>> > public Aggregator fold(Aggregator accumulator, Event value)
>> > throws Exception {
>> > accumulator.add(event.userId);
>> > return accumulator;
>> > }
>> > });
>> >
>> > uniq.print();
>> >
>> > And I can see results every 10 seconds independently on input data
>> > stream.
>> > Is it possible something in Spark?
>> >
>> > Regarding zeros in my example the reason I have prepared message queue
>> > in
>> > Kafka for the tests. If I add some messages after I able to see new
>> > messages. But in any case I need first response after 10 second. Not
>> > minutes
>> > or hours after.
>> >
>> > Thanks.
>> >
>> >
>> >
>> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger :
>> >>
>> >> If you're talking about limiting the number of 

Re: SnappyData and Structured Streaming

2016-07-06 Thread Benjamin Kim
Jags,

I should have been more specific. I am referring to what I read at 
http://snappydatainc.github.io/snappydata/streamingWithSQL/, especially the 
Streaming Tables part. It roughly coincides with the Streaming DataFrames 
outlined here 
https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h.
 I don’t if I’m wrong, but they both sound very similar. That’s why I posed 
this question.

Thanks,
Ben

> On Jul 6, 2016, at 7:03 AM, Jags Ramnarayan  wrote:
> 
> Ben,
>Note that Snappydata's primary objective is to be a distributed in-memory 
> DB for mixed workloads (i.e. streaming with transactions and analytic 
> queries). On the other hand, Spark, till date, is primarily designed as a 
> processing engine over myriad storage engines (SnappyData being one). So, the 
> marriage is quite complementary. The difference compared to other stores is 
> that SnappyData realizes its solution by deeply integrating and collocating 
> with Spark (i.e. share spark executor memory/resources with the store) 
> avoiding serializations and shuffle in many situations.
> 
> On your specific thought about being similar to Structured streaming, a 
> better discussion could be a comparison to the recently introduced State 
> store 
> 
>  (perhaps this is what you meant). 
> It proposes a KV store for streaming aggregations with support for updates. 
> The proposed API will, at some point, be pluggable so vendors can easily 
> support alternate implementations to storage, not just HDFS(default store in 
> proposed State store). 
> 
> 
> -
> Jags
> SnappyData blog 
> Download binary, source 
> 
> 
> On Wed, Jul 6, 2016 at 12:49 AM, Benjamin Kim  > wrote:
> I recently got a sales email from SnappyData, and after reading the 
> documentation about what they offer, it sounds very similar to what 
> Structured Streaming will offer w/o the underlying in-memory, spill-to-disk, 
> CRUD compliant data storage in SnappyData. I was wondering if Structured 
> Streaming is trying to achieve the same on its own or is SnappyData 
> contributing Streaming extensions that they built to the Spark project. 
> Lastly, what does the Spark community think of this so-called “Spark Data 
> Store”?
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



SparkR | Exception in invokeJava: SparkR + Windows standalone cluster

2016-07-06 Thread AC24
I am trying to setup a Spark stand-alone cluster and run SparkR. I have one
master and 2 slaves setup on a Windows Server 2012. I have tried running the
sparkR + RStudio example as shown in this blog:[
http://blog.danielemaasit.com/2015/07/26/installing-and-starting-sparkr-locally-on-windows-8-1-and-rstudio/

 
] I was able to execute this seamlessly on my RStudio(in the server) - but
what I want to achieve is firing the sparkR driver program remotely(on my
laptop), so that it gets connected to my master's IP address and so that the
execution part runs on my standalone Spark installation. Is this possible
via SparkR? I found an article online where a similar situation is
discussed.[
https://qnalist.com/questions/5006092/cannot-submit-to-a-spark-application-to-a-remote-cluster-spark-1-0

 
] I'm not sure where and how to include "spark.home" in my case though.Is
this the right way to proceed?My code & error below from RStudio(running on
laptop):Sys.setenv(SPARK_HOME =
"C:/spark-1.6.2-bin-hadoop2.6").libPaths(c(file.path(Sys.getenv("SPARK_HOME"),
"R", "lib"), .libPaths()))#load the Sparkr
library#library(SparkR)library(SparkR, lib.loc =
"C:/spark-1.6.2-bin-hadoop2.6/R/lib")# Create a spark context and a SQL
contextsc <- sparkR.init(master = "spark://Master's
IP:7077",appName="HelloWorld") sqlContext <- sparkRSQL.init(sc)#create a
sparkR DataFrameDF <- createDataFrame(sqlContext, faithful)head(DF)# Create
a simple local data.framelocalDF <- data.frame(name=c("John", "Smith",
"Sarah"), age=c(19, 23, 18))# Convert local data frame to a SparkR
DataFramedf <- createDataFrame(sqlContext, localDF)# Print its
schemaprintSchema(df)# root #  |-- name: string (nullable = true)#  |-- age:
double (nullable = true)# Create a DataFrame from a JSON filepath <-
file.path(Sys.getenv("SPARK_HOME"),"examples/src/main/resources/people.json")   

peopleDF <- jsonFile(sqlContext, path)printSchema(peopleDF)# Register this
DataFrame as a table.registerTempTable(peopleDF, "people")# SQL statements
can be run by using the sql methods provided by sqlContextteenagers <-
sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")#
Call collect to get a local data.frameteenagersLocalDF <-
collect(teenagers)# Print the teenagers in our dataset
print(teenagersLocalDF)# Stop the SparkContext nowsparkR.stop()ERROR:> sc <-
sparkR.init(master = "spark://Master's IP:7077",
appName="HelloWorld")Launching java with spark-submit command
C:/spark-1.6.2-bin-hadoop2.6/bin/spark-submit.cmd   sparkr-shell
C:\Users\admin\AppData\Local\Temp\RtmpOQea5v\backend_port31d417b617d8 Error
in invokeJava(isStatic = TRUE, className, methodName, ...) :  
java.lang.NullPointerExceptionat
org.apache.spark.SparkContext.(SparkContext.scala:583)at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)at
org.apache.spark.api.r.RRDD$.createSparkContext(RRDD.scala:376)at
org.apache.spark.api.r.RRDD.createSparkContext(RRDD.scala)at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   
at java.lang.reflect.Method.invoke(Method.java:497)at
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:141)
   
at
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:86)   
at
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)   
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   
at io.netty.channel.AbstractC



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Exception-in-invokeJava-SparkR-Windows-standalone-cluster-tp27298.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Yu Wei
Actually Time was printed out always.

Is there any better method to debug the problem? I want to update spark/mqtt 
code and rebuild again to debug further.


Thanks,

Jared



From: Saisai Shao 
Sent: Wednesday, July 6, 2016 9:24 PM
To: Yu Wei
Cc: Sean Owen; Rabin Banerjee; user@spark.apache.org
Subject: Re: It seemed JavaDStream.print() did not work when launching via yarn 
on a single node

DStream.print() will collect some of the data to driver and display, please see 
the implementation of DStream.print()

RDD.take() will collect some of the data to driver.

Normally the behavior should be consistent between cluster and local mode, 
please find out the root cause of this problem, like MQTT connection or 
something else.


def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
  val firstNum = rdd.take(num + 1)
  // scalastyle:off println
  println("---")
  println(s"Time: $time")
  println("---")
  firstNum.take(num).foreach(println)
  if (firstNum.length > num) println("...")
  println()
  // scalastyle:on println
}
  }
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = 
false)
}

On Wed, Jul 6, 2016 at 9:17 PM, Yu Wei 
> wrote:

How about DStream.print().

Does it invoke collect before print on driver?


From: Sean Owen >
Sent: Wednesday, July 6, 2016 8:20:36 PM
To: Rabin Banerjee
Cc: Yu Wei; user@spark.apache.org
Subject: Re: It seemed JavaDStream.print() did not work when launching via yarn 
on a single node

dstream.foreachRDD(_.collect.foreach(println))

On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
> wrote:
> Collect will help then . May be something like this,
> foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
> })
>



Re: SnappyData and Structured Streaming

2016-07-06 Thread Jags Ramnarayan
Ben,
   Note that Snappydata's primary objective is to be a distributed
in-memory DB for mixed workloads (i.e. streaming with transactions and
analytic queries). On the other hand, Spark, till date, is primarily
designed as a processing engine over myriad storage engines (SnappyData
being one). So, the marriage is quite complementary. The difference
compared to other stores is that SnappyData realizes its solution by deeply
integrating and collocating with Spark (i.e. share spark executor
memory/resources with the store) avoiding serializations and shuffle in
many situations.

On your specific thought about being similar to Structured streaming, a
better discussion could be a comparison to the recently introduced State
store

(perhaps
this is what you meant).
It proposes a KV store for streaming aggregations with support for updates.
The proposed API will, at some point, be pluggable so vendors can easily
support alternate implementations to storage, not just HDFS(default store
in proposed State store).


-
Jags
SnappyData blog 
Download binary, source 


On Wed, Jul 6, 2016 at 12:49 AM, Benjamin Kim  wrote:

> I recently got a sales email from SnappyData, and after reading the
> documentation about what they offer, it sounds very similar to what
> Structured Streaming will offer w/o the underlying in-memory,
> spill-to-disk, CRUD compliant data storage in SnappyData. I was wondering
> if Structured Streaming is trying to achieve the same on its own or is
> SnappyData contributing Streaming extensions that they built to the Spark
> project. Lastly, what does the Spark community think of this so-called
> “Spark Data Store”?
>
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
Hello,

  thanks, I tried to .set("spark.streaming.backpressure.enabled","true") but
result is negative. Therefore I have prepared small test
https://github.com/rssdev10/spark-kafka-streaming

  How to run:

*  git clone https://github.com/rssdev10/spark-kafka-streaming.git
  cd
spark-kafka-streaming*

  # downloads kafka and zookeeper
*  ./gradlew setup *

  # run zookeeper, kafka, and run messages generation
*  ./gradlew test_data_prepare *

And in other console just run:
*   ./gradlew test_spark*

It is easy to break data generation by CTRL-C. And continue by same
command *./gradlew
test_data_prepare*

To stop all run:
  *./gradlew stop_all*

Spark test must generate messages each 10 seconds like:
***
Processing time: 33477
Expected time: 1
Processed messages: 2897866
Message example: {"message": 1,
"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}


*message* is number of fist message in the window. Time values are in
milliseconds.

Brief results:

   1. Spark always reads all messaged from Kafka after first connection
   independently on dstream or window size time. It looks like a bug.
   2. When processing speed in Spark's app is near to speed of data
   generation all is ok.
   3. I added delayFactor in
   
https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
   to emulate slow processing. And streaming process is in degradation. When
   delayFactor=0 it looks like stable process.


Cheers


2016-07-05 17:51 GMT+02:00 Cody Koeninger :

> Test by producing messages into kafka at a rate comparable to what you
> expect in production.
>
> Test with backpressure turned on, it doesn't require you to specify a
> fixed limit on number of messages and will do its best to maintain
> batch timing.  Or you could empirically determine a reasonable fixed
> limit.
>
> Setting up a kafka topic with way more static messages in it than your
> system can handle in one batch, and then starting a stream from the
> beginning of it without turning on backpressure or limiting the number
> of messages... isn't a reasonable way to test steady state
> performance.  Flink can't magically give you a correct answer under
> those circumstances either.
>
> On Tue, Jul 5, 2016 at 10:41 AM, rss rss  wrote:
> > Hi, thanks.
> >
> >I know about possibility to limit number of messages. But the problem
> is
> > I don't know number of messages which the system able to process. It
> depends
> > on data. The example is very simple. I need a strict response after
> > specified time. Something like soft real time. In case of Flink I able to
> > setup strict time of processing like this:
> >
> > KeyedStream keyed =
> > eventStream.keyBy(event.userId.getBytes()[0] % partNum);
> > WindowedStream uniqUsersWin =
> keyed.timeWindow(
> > Time.seconds(10) );
> > DataStream uniqUsers =
> > uniq.trigger(ProcessingTimeTrigger.create())
> > .fold(new Aggregator(), new FoldFunction() {
> > @Override
> > public Aggregator fold(Aggregator accumulator, Event value)
> > throws Exception {
> > accumulator.add(event.userId);
> > return accumulator;
> > }
> > });
> >
> > uniq.print();
> >
> > And I can see results every 10 seconds independently on input data
> stream.
> > Is it possible something in Spark?
> >
> > Regarding zeros in my example the reason I have prepared message queue in
> > Kafka for the tests. If I add some messages after I able to see new
> > messages. But in any case I need first response after 10 second. Not
> minutes
> > or hours after.
> >
> > Thanks.
> >
> >
> >
> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger :
> >>
> >> If you're talking about limiting the number of messages per batch to
> >> try and keep from exceeding batch time, see
> >>
> >> http://spark.apache.org/docs/latest/configuration.html
> >>
> >> look for backpressure and maxRatePerParition
> >>
> >>
> >> But if you're only seeing zeros after your job runs for a minute, it
> >> sounds like something else is wrong.
> >>
> >>
> >> On Tue, Jul 5, 2016 at 10:02 AM, rss rss  wrote:
> >> > Hello,
> >> >
> >> >   I'm trying to organize processing of messages from Kafka. And there
> is
> >> > a
> >> > typical case when a number of messages in kafka's queue is more then
> >> > Spark
> >> > app's possibilities to process. But I need a strong time limit to
> >> > prepare
> >> > result for at least for a part of data.
> >> >
> >> > Code example:
> >> >
> >> > SparkConf sparkConf = new SparkConf()
> >> > .setAppName("Spark")
> >> > .setMaster("local");
> >> >
> >> > 

Re: Is that possible to launch spark streaming application on yarn with only one machine?

2016-07-06 Thread Mich Talebzadeh
Deploy-mode cluster don't think will work.

Try --master yarn --deploy-mode client

FYI


   -

   *Spark Local* - Spark runs on the local host. This is the simplest set
   up and best suited for learners who want to understand different concepts
   of Spark and those performing unit testing.
   -

   *Spark Standalone *– a simple cluster manager included with Spark that
   makes it easy to set up a cluster.
   -

   *YARN Cluster Mode,* the Spark driver runs inside an application master
   process which is managed by YARN on the cluster, and the client can go away
   after initiating the application. This is invoked with –master yarn
and --deploy-mode
   cluster
   -

   *YARN Client Mode*, the driver runs in the client process, and the
   application master is only used for requesting resources from YARN.
Unlike Spark
   standalone mode, in which the master’s address is specified in the
   --master parameter, in YARN mode the ResourceManager’s address is picked
   up from the Hadoop configuration. Thus, the --master parameter is yarn. This
   is invoked with --deploy-mode client

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 12:31, Yu Wei  wrote:

> Hi Deng,
>
> I tried the same code again.
>
> It seemed that when launching application via yarn on single node,
> JavaDStream.print() did not work. However, occasionally it worked.
>
> If launch the same application in local mode, it always worked.
>
>
> The code is as below,
>
> SparkConf conf = new SparkConf().setAppName("Monitor");
> JavaStreamingContext jssc = new JavaStreamingContext(conf,
> Durations.seconds(1));
> JavaReceiverInputDStream inputDS =
> MQTTUtils.createStream(jssc, "tcp://114.55.145.185:1883", "Control");
> inputDS.print();
> jssc.start();
> jssc.awaitTermination();
>
>
> Command for launching via yarn, (did not work)
>
> spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
> --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
>  Command for launching via local mode (works)
>spark-submit --master local[4] --driver-memory 4g --executor-memory 2g
> --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar
>
>
>
> Any advice?
>
>
> Thanks,
>
> Jared
>
>
>
> --
> *From:* Yu Wei 
> *Sent:* Tuesday, July 5, 2016 4:41 PM
> *To:* Deng Ching-Mallete
>
> *Cc:* user@spark.apache.org
> *Subject:* Re: Is that possible to launch spark streaming application on
> yarn with only one machine?
>
>
> Hi Deng,
>
>
> Thanks for the help. Actually I need pay more attention to memory usage.
>
> I found the root cause in my problem. It seemed that it existed in spark
> streaming MQTTUtils module.
>
> When I use "localhost" in brokerURL, it doesn't work.
>
> After change it to "127.0.0.1", it works now.
>
>
> Thanks again,
>
> Jared
>
>
>
> --
> *From:* odeach...@gmail.com  on behalf of Deng
> Ching-Mallete 
> *Sent:* Tuesday, July 5, 2016 4:03:28 PM
> *To:* Yu Wei
> *Cc:* user@spark.apache.org
> *Subject:* Re: Is that possible to launch spark streaming application on
> yarn with only one machine?
>
> Hi Jared,
>
> You can launch a Spark application even with just a single node in YARN,
> provided that the node has enough resources to run the job.
>
> It might also be good to note that when YARN calculates the memory
> allocation for the driver and the executors, there is an additional memory
> overhead that is added for each executor then it gets rounded up to the
> nearest GB, IIRC. So the 4G driver-memory + 4x2G executor memory do not
> necessarily translate to a total of 12G memory allocation. It would be more
> than that, so the node would need to have more than 12G of memory for the
> job to execute in YARN. You should be able to see something like "No
> resources available in cluster.." in the application master logs in YARN if
> that is the case.
>
> HTH,
> Deng
>
> On Tue, Jul 5, 2016 at 4:31 PM, Yu Wei  wrote:
>
>> Hi guys,
>>
>> I set up pseudo hadoop/yarn cluster on my labtop.
>>
>> I wrote a simple spark streaming program as below to receive messages
>> with MQTTUtils.
>> conf = new SparkConf().setAppName("Monitor");
>> jssc = new JavaStreamingContext(conf, Durations.seconds(1));
>> JavaReceiverInputDStream inputDS = MQTTUtils.createStream(jssc,
>> brokerUrl, topic);
>>
>> inputDS.print();
>> 

Re: Get both feature importance and ROC curve from a random forest classifier

2016-07-06 Thread Mathieu D
well, sounds trivial now ... !
thanks ;-)

2016-07-02 10:04 GMT+02:00 Yanbo Liang :

> Hi Mathieu,
>
> Using the new ml package to train a RandomForestClassificationModel, you
> can get feature importance. Then you can convert the prediction result to
> RDD and feed it into BinaryClassificationEvaluator for ROC curve. You can
> refer the following code snippet:
>
> val rf = new RandomForestClassifier()
> val model = rf.fit(trainingData)
>
> val predictions = model.transform(testData)
>
> val scoreAndLabels =
>   predictions.select(model.getRawPredictionCol, model.getLabelCol).rdd.map
> {
> case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1),
> label)
> case Row(rawPrediction: Double, label: Double) => (rawPrediction,
> label)
>   }
> val metrics = new BinaryClassificationMetrics(scoreAndLabels)
> metrics.roc()
>
>
> Thanks
> Yanbo
>
> 2016-06-15 7:13 GMT-07:00 matd :
>
>> Hi ml folks !
>>
>> I'm using a Random Forest for a binary classification.
>> I'm interested in getting both the ROC *curve* and the feature importance
>> from the trained model.
>>
>> If I'm not missing something obvious, the ROC curve is only available in
>> the
>> old mllib world, via BinaryClassificationMetrics. In the new ml package,
>> only the areaUnderROC and areaUnderPR are available through
>> BinaryClassificationEvaluator.
>>
>> The feature importance is only available in ml package, through
>> RandomForestClassificationModel.
>>
>> Any idea to get both ?
>>
>> Mathieu
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Get-both-feature-importance-and-ROC-curve-from-a-random-forest-classifier-tp27175.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Saisai Shao
DStream.print() will collect some of the data to driver and display, please
see the implementation of DStream.print()

RDD.take() will collect some of the data to driver.

Normally the behavior should be consistent between cluster and local mode,
please find out the root cause of this problem, like MQTT connection or
something else.

def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
  val firstNum = rdd.take(num + 1)
  // scalastyle:off println
  println("---")
  println(s"Time: $time")
  println("---")
  firstNum.take(num).foreach(println)
  if (firstNum.length > num) println("...")
  println()
  // scalastyle:on println
}
  }
  foreachRDD(context.sparkContext.clean(foreachFunc),
displayInnerRDDOps = false)
}


On Wed, Jul 6, 2016 at 9:17 PM, Yu Wei  wrote:

> How about DStream.print().
>
> Does it invoke collect before print on driver?
> --
> *From:* Sean Owen 
> *Sent:* Wednesday, July 6, 2016 8:20:36 PM
> *To:* Rabin Banerjee
> *Cc:* Yu Wei; user@spark.apache.org
> *Subject:* Re: It seemed JavaDStream.print() did not work when launching
> via yarn on a single node
>
> dstream.foreachRDD(_.collect.foreach(println))
>
> On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
>  wrote:
> > Collect will help then . May be something like this,
> > foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item);
> }
> > })
> >
>


spark 2.0 bloom filters

2016-07-06 Thread matd
A question for Spark developers

I see that Bloom filters have been integrated in  Spark 2.0

 
.

Hadoop already has some Bloom filter implementations, especially a  dynamic
one

 
, very interesting when the number of keys largely exceed what was imagined.

Is there any rationale (performance, implem...) for this implem in Spark
instead of re-using the one from Hadoop ?

Thanks !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-2-0-bloom-filters-tp27297.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Yu Wei
How about DStream.print().

Does it invoke collect before print on driver?


From: Sean Owen 
Sent: Wednesday, July 6, 2016 8:20:36 PM
To: Rabin Banerjee
Cc: Yu Wei; user@spark.apache.org
Subject: Re: It seemed JavaDStream.print() did not work when launching via yarn 
on a single node

dstream.foreachRDD(_.collect.foreach(println))

On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
 wrote:
> Collect will help then . May be something like this,
> foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
> })
>


Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread ayan guha
looks like a data issue to me. Either EMP or DEPT has spaces in dept id for
deptid=20,30.

Did you check in hive cli?

On Wed, Jul 6, 2016 at 10:33 PM, radha  wrote:

> Hi All,
>
> Please check below for the code and input and output, i think the output is
> not correct, i  am missing any thing? pls guide
>
> Code
>
> public class Test {
> private static JavaSparkContext jsc = null;
> private static SQLContext sqlContext = null;
> private static Configuration hadoopConf = null;
> public static void main(String[] args) {
>
> jsc = GlobalSparkContext.getJavaSparkContext();
> sqlContext = GlobalSparkContext.getSQLContext(jsc);
>
> hadoopConf = new Configuration(jsc.hadoopConfiguration());
>
>
> hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
>
> try {
> final Emp emp = new Emp();
> final Dept dept = new Dept();
>
> JavaPairRDD deptinputLines =
> jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
> Text.class, hadoopConf);
> JavaRDD deptRDD = deptinputLines.map(new
> Function, String>() {
> @Override
> public String
> call(Tuple2 arg0) throws Exception {
> return
> arg0._2.toString();
> }
>
> }).map(new Function Dept>() {
>
> public Dept call(String recordLine) throws
> Exception {
> String[] parts =
> recordLine.split(GlobalSparkContext.recordSeparator);
> return getInstanceDept(parts,
> dept);
> }
> });
>
> DataFrame deptDF =
> sqlContext.createDataFrame(deptRDD, Dept.class);
> deptDF.registerTempTable("DEPT");
> //deptDF.show();
>
> JavaPairRDD inputLines =
> jsc.newAPIHadoopFile(args[1], TextInputFormat.class, LongWritable.class,
> Text.class, hadoopConf);
> JavaRDD empRDD = inputLines.map(new
> Function Text>, String>() {
>
> private static final long
> serialVersionUID = 3371707560417405016L;
>
> @Override
> public String
> call(Tuple2 arg0) throws Exception {
> return
> arg0._2.toString();
> }
>
> }).map(new Function()
> {
>
> private static final long serialVersionUID
> = 7656942162815285622L;
>
> public Emp call(String recordLine) throws
> Exception {
> String[] parts =
> recordLine.split(GlobalSparkContext.recordSeparator);
> return getInstance(parts, emp);
> }
> });
> DataFrame empDF =
> sqlContext.createDataFrame(empRDD, Emp.class);
> empDF.registerTempTable("EMP");
>
>sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
> DEPT d ON e.deptid
> = d.deptid").show();
>
>
>
> //empDF.join(deptDF,empDF.col("deptid").equalTo(deptDF.col("deptid")),"leftouter").show();;
>
> }
> catch(Exception e){
> System.out.println(e);
> }
> }
> public static Emp getInstance(String[] parts, Emp emp) throws
> ParseException {
> emp.setId(parts[0]);
> emp.setName(parts[1]);
> emp.setDeptid(parts[2]);
>
> return emp;
> }
> public static Dept getInstanceDept(String[] parts, Dept dept)
> throws
> ParseException {
> dept.setDeptid(parts[0]);
> dept.setDeptname(parts[1]);
> return dept;
> }
> }
>
> Input
> Emp
> 1001 aba 10
> 1002 abs 20
> 1003 abd 10
> 1004 abf 30
> 1005 abg 10
> 1006 abh 20
> 1007 abj 10
> 1008 abk 30
> 1009 abl 20
> 1010 abq 10
>
> Dept
> 10 dev
> 20 Test
> 30 IT
>
> Output
> +--+--++--++
> |deptid|id|name|deptid|deptname|
> +--+--++--++
> |10|  1001| aba|10| dev|
> |10|  1003| abd|10| dev|
> |10|  1005| abg|10| dev|

Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread radha
Hi All,

Please check below for the code and input and output, i think the output is
not correct, i  am missing any thing? pls guide 

Code

public class Test {
private static JavaSparkContext jsc = null;
private static SQLContext sqlContext = null;
private static Configuration hadoopConf = null;
public static void main(String[] args) {

jsc = GlobalSparkContext.getJavaSparkContext();
sqlContext = GlobalSparkContext.getSQLContext(jsc);

hadoopConf = new Configuration(jsc.hadoopConfiguration());

hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);

try {
final Emp emp = new Emp();
final Dept dept = new Dept();

JavaPairRDD deptinputLines =
jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
Text.class, hadoopConf);
JavaRDD deptRDD = deptinputLines.map(new
Function, String>() {
@Override
public String 
call(Tuple2 arg0) throws Exception {
return 
arg0._2.toString();
}

}).map(new Function() {

public Dept call(String recordLine) throws 
Exception {
String[] parts = 
recordLine.split(GlobalSparkContext.recordSeparator);
return getInstanceDept(parts, dept);
}
});

DataFrame deptDF = sqlContext.createDataFrame(deptRDD, 
Dept.class);
deptDF.registerTempTable("DEPT");
//deptDF.show();

JavaPairRDD inputLines =
jsc.newAPIHadoopFile(args[1], TextInputFormat.class, LongWritable.class,
Text.class, hadoopConf);
JavaRDD empRDD = inputLines.map(new 
Function, String>() {

private static final long 
serialVersionUID = 3371707560417405016L;

@Override
public String 
call(Tuple2 arg0) throws Exception {
return 
arg0._2.toString();
}

}).map(new Function() {

private static final long serialVersionUID = 
7656942162815285622L;

public Emp call(String recordLine) throws 
Exception {
String[] parts = 
recordLine.split(GlobalSparkContext.recordSeparator);
return getInstance(parts, emp);
}
});
DataFrame empDF = sqlContext.createDataFrame(empRDD, 
Emp.class);
empDF.registerTempTable("EMP");

   sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN DEPT d 
ON e.deptid
= d.deptid").show();

  
//empDF.join(deptDF,empDF.col("deptid").equalTo(deptDF.col("deptid")),"leftouter").show();;

}
catch(Exception e){
System.out.println(e);
}
}
public static Emp getInstance(String[] parts, Emp emp) throws
ParseException {
emp.setId(parts[0]);
emp.setName(parts[1]);
emp.setDeptid(parts[2]);

return emp;
}
public static Dept getInstanceDept(String[] parts, Dept dept) throws
ParseException {
dept.setDeptid(parts[0]);
dept.setDeptname(parts[1]);
return dept;
}
}

Input 
Emp
1001 aba 10
1002 abs 20
1003 abd 10
1004 abf 30
1005 abg 10
1006 abh 20
1007 abj 10
1008 abk 30
1009 abl 20
1010 abq 10

Dept
10 dev
20 Test
30 IT

Output
+--+--++--++
|deptid|id|name|deptid|deptname|
+--+--++--++
|10|  1001| aba|10| dev|
|10|  1003| abd|10| dev|
|10|  1005| abg|10| dev|
|10|  1007| abj|10| dev|
|10|  1010| abq|10| dev|
|20|  1002| abs|  null|null|
|20|  1006| abh|  null|null|
|20|  1009| abl|  null|null|
|30|  1004| abf|  null|null|
|30|  1008| abk|  null|null|
+--+--++--++



--
View this message in context: 

Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread Radha krishna
Hi All,

Please check below for the code and input and output, i think the output is
not correct, i  am missing any thing? pls guide

Code

public class Test {
private static JavaSparkContext jsc = null;
private static SQLContext sqlContext = null;
private static Configuration hadoopConf = null;
public static void main(String[] args) {
jsc = GlobalSparkContext.getJavaSparkContext();
sqlContext = GlobalSparkContext.getSQLContext(jsc);

hadoopConf = new Configuration(jsc.hadoopConfiguration());
hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
try {
final Emp emp = new Emp();
final Dept dept = new Dept();

JavaPairRDD deptinputLines =
jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
Text.class, hadoopConf);
JavaRDD deptRDD = deptinputLines.map(new
Function, String>() {
@Override
public String call(Tuple2 arg0) throws Exception {
return arg0._2.toString();
}

}).map(new Function() {

public Dept call(String recordLine) throws Exception {
String[] parts = recordLine.split(GlobalSparkContext.recordSeparator);
return getInstanceDept(parts, dept);
}
});

DataFrame deptDF = sqlContext.createDataFrame(deptRDD, Dept.class);
deptDF.registerTempTable("DEPT");
//deptDF.show();

JavaPairRDD inputLines = jsc.newAPIHadoopFile(args[1],
TextInputFormat.class, LongWritable.class, Text.class, hadoopConf);
JavaRDD empRDD = inputLines.map(new Function, String>() {

private static final long serialVersionUID = 3371707560417405016L;

@Override
public String call(Tuple2 arg0) throws Exception {
return arg0._2.toString();
}

}).map(new Function() {

private static final long serialVersionUID = 7656942162815285622L;

public Emp call(String recordLine) throws Exception {
String[] parts = recordLine.split(GlobalSparkContext.recordSeparator);
return getInstance(parts, emp);
}
});
DataFrame empDF = sqlContext.createDataFrame(empRDD, Emp.class);
empDF.registerTempTable("EMP");

  sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN DEPT d ON e.deptid =
d.deptid").show();


//empDF.join(deptDF,empDF.col("deptid").equalTo(deptDF.col("deptid")),"leftouter").show();;
}
catch(Exception e){
System.out.println(e);
}
}
public static Emp getInstance(String[] parts, Emp emp) throws
ParseException {
emp.setId(parts[0]);
emp.setName(parts[1]);
emp.setDeptid(parts[2]);
return emp;
}
public static Dept getInstanceDept(String[] parts, Dept dept) throws
ParseException {
dept.setDeptid(parts[0]);
dept.setDeptname(parts[1]);
return dept;
}
}

Input
Emp
1001 aba 10
1002 abs 20
1003 abd 10
1004 abf 30
1005 abg 10
1006 abh 20
1007 abj 10
1008 abk 30
1009 abl 20
1010 abq 10

Dept
10 dev
20 Test
30 IT

Output
+--+--++--++
|deptid|id|name|deptid|deptname|
+--+--++--++
|10|  1001| aba|10| dev|
|10|  1003| abd|10| dev|
|10|  1005| abg|10| dev|
|10|  1007| abj|10| dev|
|10|  1010| abq|10| dev|
|20|  1002| abs|  null|null|
|20|  1006| abh|  null|null|
|20|  1009| abl|  null|null|
|30|  1004| abf|  null|null|
|30|  1008| abk|  null|null|
+--+--++--++

Regards
Radha


Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Sean Owen
dstream.foreachRDD(_.collect.foreach(println))

On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
 wrote:
> Collect will help then . May be something like this,
> foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
> })
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Rabin Banerjee
Collect will help then . May be something like this,
foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
})

On Wed, Jul 6, 2016 at 5:46 PM, Sean Owen  wrote:

> That's still causing the element to be printed on the remote
> executors, not the driver. You'd have to collect the RDD and then
> println, really. Also see DStream.print()
>
> On Wed, Jul 6, 2016 at 1:07 PM, Rabin Banerjee
>  wrote:
> > It's not working because , you haven't collected the data.
> >
> > Try something like
> >
> > DStream.forEachRDD((rdd)=> {rdd.foreach(println)})
> >
> > Thanks,
> > Rabin
> >
> >
> > On Wed, Jul 6, 2016 at 5:05 PM, Yu Wei  wrote:
> >>
> >> Hi guys,
> >>
> >>
> >> It seemed that when launching application via yarn on single node,
> >> JavaDStream.print() did not work. However, occasionally it worked.
> >>
> >> If launch the same application in local mode, it always worked.
> >>
> >>
> >> The code is as below,
> >>
> >> SparkConf conf = new SparkConf().setAppName("Monitor");
> >> JavaStreamingContext jssc = new JavaStreamingContext(conf,
> >> Durations.seconds(1));
> >> JavaReceiverInputDStream inputDS = MQTTUtils.createStream(jssc,
> >> "tcp://114.55.145.185:1883", "Control");
> >> inputDS.print();
> >> jssc.start();
> >> jssc.awaitTermination();
> >>
> >>
> >> Command for launching via yarn, (did not work)
> >> spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
> >> --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
> >>
> >> Command for launching via local mode (works)
> >> spark-submit --master local[4] --driver-memory 4g --executor-memory 2g
> >> --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar
> >>
> >>
> >> Any thoughts about the problem?
> >>
> >>
> >> Thanks,
> >>
> >> Jared
> >>
> >
>


Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Sean Owen
That's still causing the element to be printed on the remote
executors, not the driver. You'd have to collect the RDD and then
println, really. Also see DStream.print()

On Wed, Jul 6, 2016 at 1:07 PM, Rabin Banerjee
 wrote:
> It's not working because , you haven't collected the data.
>
> Try something like
>
> DStream.forEachRDD((rdd)=> {rdd.foreach(println)})
>
> Thanks,
> Rabin
>
>
> On Wed, Jul 6, 2016 at 5:05 PM, Yu Wei  wrote:
>>
>> Hi guys,
>>
>>
>> It seemed that when launching application via yarn on single node,
>> JavaDStream.print() did not work. However, occasionally it worked.
>>
>> If launch the same application in local mode, it always worked.
>>
>>
>> The code is as below,
>>
>> SparkConf conf = new SparkConf().setAppName("Monitor");
>> JavaStreamingContext jssc = new JavaStreamingContext(conf,
>> Durations.seconds(1));
>> JavaReceiverInputDStream inputDS = MQTTUtils.createStream(jssc,
>> "tcp://114.55.145.185:1883", "Control");
>> inputDS.print();
>> jssc.start();
>> jssc.awaitTermination();
>>
>>
>> Command for launching via yarn, (did not work)
>> spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
>> --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
>>
>> Command for launching via local mode (works)
>> spark-submit --master local[4] --driver-memory 4g --executor-memory 2g
>> --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar
>>
>>
>> Any thoughts about the problem?
>>
>>
>> Thanks,
>>
>> Jared
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Rabin Banerjee
It's not working because , you haven't collected the data.

Try something like

DStream.forEachRDD((rdd)=> {rdd.foreach(println)})

Thanks,
Rabin


On Wed, Jul 6, 2016 at 5:05 PM, Yu Wei  wrote:

> Hi guys,
>
>
> It seemed that when launching application via yarn on single node,
> JavaDStream.print() did not work. However, occasionally it worked.
>
> If launch the same application in local mode, it always worked.
>
>
> The code is as below,
> SparkConf conf = new SparkConf().setAppName("Monitor");
> JavaStreamingContext jssc = new JavaStreamingContext(conf,
> Durations.seconds(1));
> JavaReceiverInputDStream inputDS = MQTTUtils.createStream(jssc,
> "tcp://114.55.145.185:1883", "Control");
> inputDS.print();
> jssc.start();
> jssc.awaitTermination();
>
>
> Command for launching via yarn, (did not work)
> spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
> --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
>
> Command for launching via local mode (works)
> spark-submit --master local[4] --driver-memory 4g --executor-memory 2g
> --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar
>
>
> Any thoughts about the problem?
>
>
> Thanks,
> Jared
>
>


It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Yu Wei
Hi guys,


It seemed that when launching application via yarn on single node, 
JavaDStream.print() did not work. However, occasionally it worked.

If launch the same application in local mode, it always worked.


The code is as below,

SparkConf conf = new SparkConf().setAppName("Monitor");
JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(1));
JavaReceiverInputDStream inputDS = MQTTUtils.createStream(jssc, 
"tcp://114.55.145.185:1883", "Control");
inputDS.print();
jssc.start();
jssc.awaitTermination();


Command for launching via yarn, (did not work)
spark-submit --master yarn --deploy-mode cluster --driver-memory 4g 
--executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar

Command for launching via local mode (works)
spark-submit --master local[4] --driver-memory 4g --executor-memory 2g 
--num-executors 4 target/CollAna-1.0-SNAPSHOT.jar


Any thoughts about the problem?


Thanks,

Jared



Re: Is that possible to launch spark streaming application on yarn with only one machine?

2016-07-06 Thread Yu Wei
Hi Deng,

I tried the same code again.

It seemed that when launching application via yarn on single node, 
JavaDStream.print() did not work. However, occasionally it worked.

If launch the same application in local mode, it always worked.


The code is as below,

SparkConf conf = new SparkConf().setAppName("Monitor");
JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(1));
JavaReceiverInputDStream inputDS = MQTTUtils.createStream(jssc, 
"tcp://114.55.145.185:1883", "Control");
inputDS.print();
jssc.start();
jssc.awaitTermination();


Command for launching via yarn, (did not work)

spark-submit --master yarn --deploy-mode cluster --driver-memory 4g 
--executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
 Command for launching via local mode (works)
   spark-submit --master local[4] --driver-memory 4g --executor-memory 2g 
--num-executors 4 target/CollAna-1.0-SNAPSHOT.jar



Any advice?


Thanks,

Jared



From: Yu Wei 
Sent: Tuesday, July 5, 2016 4:41 PM
To: Deng Ching-Mallete
Cc: user@spark.apache.org
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?


Hi Deng,


Thanks for the help. Actually I need pay more attention to memory usage.

I found the root cause in my problem. It seemed that it existed in spark 
streaming MQTTUtils module.

When I use "localhost" in brokerURL, it doesn't work.

After change it to "127.0.0.1", it works now.


Thanks again,

Jared




From: odeach...@gmail.com  on behalf of Deng Ching-Mallete 

Sent: Tuesday, July 5, 2016 4:03:28 PM
To: Yu Wei
Cc: user@spark.apache.org
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?

Hi Jared,

You can launch a Spark application even with just a single node in YARN, 
provided that the node has enough resources to run the job.

It might also be good to note that when YARN calculates the memory allocation 
for the driver and the executors, there is an additional memory overhead that 
is added for each executor then it gets rounded up to the nearest GB, IIRC. So 
the 4G driver-memory + 4x2G executor memory do not necessarily translate to a 
total of 12G memory allocation. It would be more than that, so the node would 
need to have more than 12G of memory for the job to execute in YARN. You should 
be able to see something like "No resources available in cluster.." in the 
application master logs in YARN if that is the case.

HTH,
Deng

On Tue, Jul 5, 2016 at 4:31 PM, Yu Wei 
> wrote:

Hi guys,

I set up pseudo hadoop/yarn cluster on my labtop.

I wrote a simple spark streaming program as below to receive messages with 
MQTTUtils.

conf = new SparkConf().setAppName("Monitor");
jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream inputDS = MQTTUtils.createStream(jssc, 
brokerUrl, topic);

inputDS.print();
jssc.start();
jssc.awaitTermination()


If I submitted the app with "--master local[2]", it works well.

spark-submit --master local[4] --driver-memory 4g --executor-memory 2g 
--num-executors 4 target/CollAna-1.0-SNAPSHOT.jar

If I submitted with "--master yarn",  no output for "inputDS.print()".

spark-submit --master yarn --deploy-mode cluster --driver-memory 4g 
--executor-memory 2g --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar

Is it possible to launch spark application on yarn with only one single node?


Thanks for your advice.


Jared




how to select first 50 value of each group after group by?

2016-07-06 Thread luohui20001
hi thereI have a DF with 3 columns: id , pv, location.(the rows are already 
grouped by location and sort by pv in des)  I wanna get the first 50 id values 
grouped by location. I checked the API of dataframe,groupeddata,pairRDD, and 
found no match.  is there a way to do this naturally?  any info will be 
appreciated.




 

ThanksBest regards!
San.Luo


streaming new data into bigger parquet file

2016-07-06 Thread Igor Berman
Hi
I was reading following tutorial
https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/08%20Write%20Output%20To%20S3.html


of streaming data to s3 of databricks_guide
and it states that sometimes I need to do compaction of small files(e.g.
from spark streaming) into compacted big file(I understand why - better
read performance, to solve "many small files" problem etc)

My questions are:
1. what happens when I have big parquet file partitioned by some field and
I want to append new small files into this big file? Is spark overrides
whole data or it can append the new data at the end?
2. while appending process happens - how can I ensure that readers of big
parquet files are not blocked and won't get any errors?(i.e. are files are
"available" when appending new data to them?)

I will highly appreciate any pointers

thanks in advance,
Igor


Re: Spark Task failure with File segment length as negative

2016-07-06 Thread Priya Ch
Is anyone resolved this ?


Thanks,
Padma CH

On Wed, Jun 22, 2016 at 4:39 PM, Priya Ch 
wrote:

> Hi All,
>
> I am running Spark Application with 1.8TB of data (which is stored in Hive
> tables format).  I am reading the data using HiveContect and processing it.
> The cluster has 5 nodes total, 25 cores per machine and 250Gb per node. I
> am launching the application with 25 executors with 5 cores each and 45GB
> per executor. Also, specified the property
> spark.yarn.executor.memoryOverhead=2024.
>
> During the execution, tasks are lost and ShuffleMapTasks are re-submitted.
> I am seeing that tasks are failing with the following message -
>
> *java.lang.IllegalArgumentException: requirement failed: File segment
> length cannot be negative (got -27045427)*
>
>
>
>
>
>
>
>
>
> * at scala.Predef$.require(Predef.scala:233)*
>
>
>
>
>
>
>
>
>
> * at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:220)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:184)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:398)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:206)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)*
>
>
>
>
>
>
>
>
>
> * at org.apache.spark.scheduler.Task.run(Task.scala:89)*
>
>
>
>
>
>
>
>
>
> * at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)*
>
>
>
>
>
>
>
>
>
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>
>
>
>
>
>
>
>
>
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>
>
>
>
>
>
>
>
>
> I understood that its because the shuffle block is > 2G, the Int value is
> taking negative and throwing the above exeception.
>
> Can someone throw light on this ? What is the fix for this ?
>
> Thanks,
> Padma CH
>
>
>
>
>
>
>
>
>
>
>


where is open source Distributed service framework use for spark??

2016-07-06 Thread ????????
i want to my server  is Distributed  and used for spark,where are good service 
framework,must open source.

thanks