Re: Reading parquet files into Spark Streaming

2016-08-26 Thread Akhilesh Pathodia
Hi Renato,

Which version of Spark are you using?

If spark version is 1.3.0 or more then you can use SqlContext to read the
parquet file which will give you DataFrame. Please follow the below link:

https://spark.apache.org/docs/1.5.0/sql-programming-guide.html#loading-data-programmatically

Thanks,
Akhilesh

On Sat, Aug 27, 2016 at 3:26 AM, Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> Anybody? I think Rory also didn't get an answer from the list ...
>
> https://mail-archives.apache.org/mod_mbox/spark-user/201602.mbox/%3CCAC+
> fre14pv5nvqhtbvqdc+6dkxo73odazfqslbso8f94ozo...@mail.gmail.com%3E
>
>
>
> 2016-08-26 17:42 GMT+02:00 Renato Marroquín Mogrovejo <
> renatoj.marroq...@gmail.com>:
>
>> Hi all,
>>
>> I am trying to use parquet files as input for DStream operations, but I
>> can't find any documentation or example. The only thing I found was [1] but
>> I also get the same error as in the post (Class
>> parquet.avro.AvroReadSupport not found).
>> Ideally I would like to do have something like this:
>>
>> val oDStream = ssc.fileStream[Void, Order, ParquetInputFormat[Order]]("da
>> ta/")
>>
>> where Order is a case class and the files inside "data" are all parquet
>> files.
>> Any hints would be highly appreciated. Thanks!
>>
>>
>> Best,
>>
>> Renato M.
>>
>> [1] http://stackoverflow.com/questions/35413552/how-do-i-read-
>> in-parquet-files-using-ssc-filestream-and-what-is-the-nature
>>
>
>


Re: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Mike Metzger
Pyspark example based on the data you provided (obviously your dataframes
will come from whatever source you have, not entered directly).  This uses
an intermediary dataframe with grouped data for clarity, but you could pull
this off in other ways.

-- Code --

from pyspark.sql.types import *
from pyspark.sql.functions import col

fcst_schema = StructType([
StructField('Product', StringType(), nullable=False),
StructField('fcst_qty', IntegerType(), nullable=False)
  ])

fcst_data = sc.parallelize([Row("A", 100),
 Row("B", 50)])

so_schema = StructType([
StructField('OrderNum', IntegerType(), nullable=False),
StructField('ItemNum', StringType(), nullable=False),
StructField('Sales_qty', IntegerType(), nullable=False)
  ])
so_data = sc.parallelize([
   Row(101, "A", 10),
   Row(101, "B", 5),
   Row(102, "A", 5),
   Row(102, "B", 10)])

fcst_df = sqlContext.createDataFrame(fcst_data, fcst_schema)
so_df = sqlContext.createDataFrame(so_data, so_schema)

fcst_df.show()
so_df.show()
orderTotals_df =
so_df.groupBy('ItemNum').sum('Sales_qty').select('ItemNum',col('sum(Sales_qty)').alias('Sales_qty'))
orderTotals_df.show()

fcst_df.join(orderTotals_df, fcst_df.Product == orderTotals_df.ItemNum,
'left_outer').select(fcst_df.Product, (fcst_df.fcst_qty -
orderTotals_df.Sales_qty).alias('fcst_qty')).show()


-- Output examples (fcst_df, so_df, orderTotals_df, and the resultant df) --

+---++ |Product|fcst_qty| +---++ | A| 100| | B| 50|
+---++ ++---+-+
|OrderNum|ItemNum|Sales_qty| ++---+-+ | 101| A| 10| |
101| B| 5| | 102| A| 5| | 102| B| 10| ++---+-+
+---+-+ |ItemNum|Sales_qty| +---+-+ | B| 15| | A|
15| +---+-+ +---++ |Product|fcst_qty|
+---++ | B| 35| | A| 85| +---++

The other languages should work similarly. Honestly, I'd probably just
setup the dataframes and write it in SQL, possibly with a UDF, to keep
things a little more clear.

Thanks

Mike


On Fri, Aug 26, 2016 at 4:45 PM, Subhajit Purkayastha 
wrote:

> So the data in the fcst dataframe is like this
>
>
>
> Product, fcst_qty
>
> A 100
>
> B 50
>
>
>
> Sales DF has data like this
>
>
>
> Order# Item#Sales qty
>
> 101 A 10
>
> 101 B 5
>
> 102 A 5
>
> 102 B 10
>
>
>
> I want to update the FCSt DF data, based on Product=Item#
>
>
>
> So the resultant FCST DF should have data
>
> Product, fcst_qty
>
> A 85
>
> B 35
>
>
>
> Hope it helps
>
>
>
> If I join the data between the 2 DFs (based on Product# and item#), I will
> get a cartesion join and my result will not be what I want
>
>
>
> Thanks for your help
>
>
>
>
>
> *From:* Mike Metzger [mailto:m...@flexiblecreations.com]
> *Sent:* Friday, August 26, 2016 2:12 PM
>
> *To:* Subhajit Purkayastha 
> *Cc:* user @spark 
> *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame
>
>
>
> Without seeing exactly what you were wanting to accomplish, it's hard to
> say.  A Join is still probably the method I'd suggest using something like:
>
>
>
> select (FCST.quantity - SO.quantity) as quantity
>
> 
>
> from FCST
>
> LEFT OUTER JOIN
>
> SO ON FCST.productid = SO.productid
>
> WHERE
>
> 
>
>
>
> with specifics depending on the layout and what language you're using.
>
>
>
> Thanks
>
>
>
> Mike
>
>
>
> On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha 
> wrote:
>
> Mike,
>
>
>
> The grains of the dataFrame are different.
>
>
>
> I need to reduce the forecast qty (which is in the FCST DF)  based on the
> sales qty (coming from the sales  order DF)
>
>
>
> Hope it helps
>
>
>
> Subhajit
>
>
>
> *From:* Mike Metzger [mailto:m...@flexiblecreations.com]
> *Sent:* Friday, August 26, 2016 1:13 PM
> *To:* Subhajit Purkayastha 
> *Cc:* user @spark 
> *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame
>
>
>
> Without seeing the makeup of the Dataframes nor what your logic is for
> updating them, I'd suggest doing a join of the Forecast DF with the
> appropriate columns from the SalesOrder DF.
>
>
>
> Mike
>
>
>
> On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha 
> wrote:
>
> I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need
> to update the Forecast Dataframe record(s), based on the SaleOrder DF
> record. What is the best way to achieve this functionality
>
>
>
>
>


Re: Dynamically change executors settings

2016-08-26 Thread linguin . m . s
Hi,

No, currently you can't change the setting. 

// maropu



2016/08/27 11:40、Vadim Semenov  のメッセージ:

> Hi spark users,
> 
> I wonder if it's possible to change executors settings on-the-fly.
> I have the following use-case: I have a lot of non-splittable skewed files in 
> a custom format that I read using a custom Hadoop RecordReader. These files 
> can be small & huge and I'd like to use only one-two cores per executor while 
> they get processed (to use the whole heap). But once they got processed I'd 
> like to enable all cores.
> I know that I can achieve this by splitting it into two separate jobs but I 
> wonder if it's possible to somehow achieve the behavior I described.
> 
> Thanks!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Please assist: Building Docker image containing spark 2.0

2016-08-26 Thread Mike Metzger
I would also suggest building the container manually first and setup
everything you specifically need.  Once done, you can then grab the history
file, pull out the invalid commands and build out the completed
Dockerfile.  Trying to troubleshoot an installation via Dockerfile is often
an exercise in futility.

Thanks

Mike


On Fri, Aug 26, 2016 at 5:14 PM, Michael Gummelt 
wrote:

> Run with "-X -e" like the error message says. See what comes out.
>
> On Fri, Aug 26, 2016 at 2:23 PM, Tal Grynbaum 
> wrote:
>
>> Did you specify -Dscala-2.10
>> As in
>> ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4
>> -Dscala-2.10 -DskipTests clean package
>> If you're building with scala 2.10
>>
>> On Sat, Aug 27, 2016, 00:18 Marco Mistroni  wrote:
>>
>>> Hello Michael
>>> uhm i celebrated too soon
>>> Compilation of spark on docker image went near the end and then it
>>> errored out with this message
>>>
>>> INFO] BUILD FAILURE
>>> [INFO] 
>>> 
>>> [INFO] Total time: 01:01 h
>>> [INFO] Finished at: 2016-08-26T21:12:25+00:00
>>> [INFO] Final Memory: 69M/324M
>>> [INFO] 
>>> 
>>> [ERROR] Failed to execute goal 
>>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile
>>> (scala-compile-first) on project spark-mllib_2.11: Execution
>>> scala-compile-first of goal 
>>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile
>>> failed. CompileFailed -> [Help 1]
>>> [ERROR]
>>> [ERROR] To see the full stack trace of the errors, re-run Maven with the
>>> -e switch.
>>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>>> [ERROR]
>>> [ERROR] For more information about the errors and possible solutions,
>>> please read the following articles:
>>> [ERROR] [Help 1] http://cwiki.apache.org/conflu
>>> ence/display/MAVEN/PluginExecutionException
>>> [ERROR]
>>> [ERROR] After correcting the problems, you can resume the build with the
>>> command
>>> [ERROR]   mvn  -rf :spark-mllib_2.11
>>> The command '/bin/sh -c ./build/mvn -Pyarn -Phadoop-2.4
>>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero code:
>>> 1
>>>
>>> what am i forgetting?
>>> once again, last command i launched on the docker file is
>>>
>>>
>>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
>>> clean package
>>>
>>> kr
>>>
>>>
>>>
>>> On Fri, Aug 26, 2016 at 6:18 PM, Michael Gummelt >> > wrote:
>>>
 :)

 On Thu, Aug 25, 2016 at 2:29 PM, Marco Mistroni 
 wrote:

> No i wont accept that :)
> I can't believe i have wasted 3 hrs for a space!
>
> Many thanks MIchael!
>
> kr
>
> On Thu, Aug 25, 2016 at 10:01 PM, Michael Gummelt <
> mgumm...@mesosphere.io> wrote:
>
>> You have a space between "build" and "mvn"
>>
>> On Thu, Aug 25, 2016 at 1:31 PM, Marco Mistroni 
>> wrote:
>>
>>> HI all
>>>  sorry for the partially off-topic, i hope there's someone on the
>>> list who has tried the same and encountered similar issuse
>>>
>>> Ok so i have created a Docker file to build an ubuntu container
>>> which inlcudes spark 2.0, but somehow when it gets to the point where it
>>> has to kick off  ./build/mvn command, it errors out with the following
>>>
>>> ---> Running in 8c2aa6d59842
>>> /bin/sh: 1: ./build: Permission denied
>>> The command '/bin/sh -c ./build mvn -Pyarn -Phadoop-2.4
>>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero 
>>> code:
>>> 126
>>>
>>> I am puzzled as i am root when i build the container, so i should
>>> not encounter this issue (btw, if instead of running mvn from the build
>>> directory  i use the mvn which i installed on the container, it works 
>>> fine
>>> but it's  painfully slow)
>>>
>>> here are the details of my Spark command( scala 2.10, java 1.7 , mvn
>>> 3.3.9 and git have already been installed)
>>>
>>> # Spark
>>> RUN echo "Installing Apache spark 2.0"
>>> RUN git clone git://github.com/apache/spark.git
>>> WORKDIR /spark
>>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0
>>> -DskipTests clean package
>>>
>>>
>>> Could anyone assist pls?
>>>
>>> kindest regarsd
>>>  Marco
>>>
>>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>>
>
>


 --
 Michael Gummelt
 Software Engineer
 Mesosphere

>>>
>>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


Dynamically change executors settings

2016-08-26 Thread Vadim Semenov
Hi spark users,

I wonder if it's possible to change executors settings on-the-fly.
I have the following use-case: I have a lot of non-splittable skewed files
in a custom format that I read using a custom Hadoop RecordReader. These
files can be small & huge and I'd like to use only one-two cores per
executor while they get processed (to use the whole heap). But once they
got processed I'd like to enable all cores.
I know that I can achieve this by splitting it into two separate jobs but I
wonder if it's possible to somehow achieve the behavior I described.

Thanks!


Re: mutable.LinkedHashMap kryo serialization issues

2016-08-26 Thread Rahul Palamuttam
Hi,

I apologize, I spoke too soon.
Those transient member variables may not be the issue.

To clarify my test case I am creating a LinkedHashMap with two elements in
a map expression on an RDD.
Note that the LinkedHashMaps are being created on the worker JVMs (not the
driver JVM) and THEN collected to the driver JVM.
I am NOT creating LinkedHashMaps on the driver and then parallelizing them
(sending them to worker JVMs).

As Renato said spark requires us to register classes that aren't yet in
Chill.
As far as I know there are three ways to register and it's through api
calls on sparkConf.

1. sparkConf().registerKryoClasses(Array(classOf[...], clasOf[...]))
* This is the method of registering classes as described in the Tuning page:
http://spark.apache.org/docs/latest/tuning.html#data-serialization

2. sparkConf().set("spark.kryo.classesToRegister", "cName1, cName2")

3. sparkConf().set("spark.kryo.registrator", "registrator1, registrator2")

In the first two methods, which set the classes to register in Kryo,
what I get are empty mutable.LinkedHashMaps after calling collect on the
RDD.
To my best understanding this should not happen (none of the other
collection classes I have used have this problem).

For the third method I created a registrator for mutable.LinkedHashMap
which can be found here :
https://gist.github.com/rahulpalamuttam/9f3bfa39a160efa80844d3a7a7bd87cd

I set the registrator like so :
sparkConf().set("spark.kryo.registrator",
"org.dia.core.MutableLinkedHashMapRegistrator").
Now, when I do the same test, I get an Array of LinkedHashMaps.
Each LinkedHashMap contains the entries I populated it with in the map task.

Why do the first two methods result in improper serialization of
mutable.LinkedHashMap?
Should I file a JIRA for it?

Much credit should be given to Martin Grotzke from EsotericSoftware/kryo
who helped me tremendously.

Best,

Rahul Palamuttam




On Fri, Aug 26, 2016 at 10:16 AM, Rahul Palamuttam 
wrote:

> Thanks Renato.
>
> I forgot to reply all last time. I apologize for the rather confusing
> example.
> All that the snipet code did was
> 1. Make an RDD of LinkedHashMaps with size 2
> 2. On the worker side get the sizes of the HashMaps (via a map(hash =>
> hash.size))
> 3. On the driver call collect on the RDD[Ints] which is the RDD of hashmap
> sizes giving you an Array[Ints]
> 4. On the driver call collect on the RDD[LinkedHashMap] giving you an
> Array[LinkedHashMap]
> 5. Check the size of a hashmap in Array[LinkedHashMap] with any size value
> in Array[Ints] (they're all going to be the same size).
> 6. The sizes differ because the elements of the LinkedHashMap were never
> copied over
>
> Anyway I think I've tracked down the issue and it doesn't seem to be a
> spark or kryo issue.
>
> For those it concerns LinkedHashMap has this serialization issue because
> it has transient members for firstEntry and lastEntry.
> Take a look here : https://github.com/scala/scala/blob/v2.11.8/src/
> library/scala/collection/mutable/LinkedHashMap.scala#L62
>
> Those attributes are not going to be serialized.
> Furthermore, the iterator on LinkedHashMap depends on the firstEntry
> variable
> Since that member is not serialized it is null.
> The iterator requires the firstEntry variable to walk the LinkedHashMap
> https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/
> mutable/LinkedHashMap.scala#L94-L100
>
> I wonder why these two variables were made transient.
>
> Best,
> Rahul Palamuttam
>
>
> On Thu, Aug 25, 2016 at 11:13 PM, Renato Marroquín Mogrovejo <
> renatoj.marroq...@gmail.com> wrote:
>
>> Hi Rahul,
>>
>> You have probably already figured this one out, but anyway...
>> You need to register the classes that you'll be using with Kryo because
>> it does not support all Serializable types and requires you to register the
>> classes you’ll use in the program in advance. So when you don't register
>> the class, Kryo doesn't know how to serialize/deserialize it.
>>
>>
>> Best,
>>
>> Renato M.
>>
>> 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam :
>>
>>> Hi,
>>>
>>> Just sending this again to see if others have had this issue.
>>>
>>> I recently switched to using kryo serialization and I've been running
>>> into errors
>>> with the mutable.LinkedHashMap class.
>>>
>>> If I don't register the mutable.LinkedHashMap class then I get an
>>> ArrayStoreException seen below.
>>> If I do register the class, then when the LinkedHashMap is collected on
>>> the driver, it does not contain any elements.
>>>
>>> Here is the snippet of code I used :
>>>
>>> val sc = new SparkContext(new SparkConf()
>>>   .setMaster("local[*]")
>>>   .setAppName("Sample")
>>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>   .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, 
>>> String]])))
>>>
>>> val collect = sc.parallelize(0 to 10)
>>>   .map(p => new mutable.LinkedHashMap[String, String]() ++= 

Re: GraphFrames 0.2.0 released

2016-08-26 Thread Joseph Bradley
This should do it:
https://github.com/graphframes/graphframes/releases/tag/release-0.2.0
Thanks for the reminder!
Joseph

On Wed, Aug 24, 2016 at 10:11 AM, Maciej Bryński  wrote:

> Hi,
> Do you plan to add tag for this release on github ?
> https://github.com/graphframes/graphframes/releases
>
> Regards,
> Maciek
>
> 2016-08-17 3:18 GMT+02:00 Jacek Laskowski :
>
>> Hi Tim,
>>
>> AWESOME. Thanks a lot for releasing it. That makes me even more eager
>> to see it in Spark's codebase (and replacing the current RDD-based
>> API)!
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Aug 16, 2016 at 9:32 AM, Tim Hunter 
>> wrote:
>> > Hello all,
>> > I have released version 0.2.0 of the GraphFrames package. Apart from a
>> few
>> > bug fixes, it is the first release published for Spark 2.0 and both
>> scala
>> > 2.10 and 2.11. Please let us know if you have any comment or questions.
>> >
>> > It is available as a Spark package:
>> > https://spark-packages.org/package/graphframes/graphframes
>> >
>> > The source code is available as always at
>> > https://github.com/graphframes/graphframes
>> >
>> >
>> > What is GraphFrames?
>> >
>> > GraphFrames is a DataFrame-based graph engine Spark. In addition to the
>> > algorithms available in GraphX, users can write highly expressive
>> queries by
>> > leveraging the DataFrame API, combined with a new API for motif
>> finding. The
>> > user also benefits from DataFrame performance optimizations within the
>> Spark
>> > SQL engine.
>> >
>> > Cheers
>> >
>> > Tim
>> >
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Maciek Bryński
>


Re: Is there anyway Spark UI is set to poll and refreshes itself

2016-08-26 Thread Mich Talebzadeh
Thanks Jacek,

I will have a look. I think it is long overdue.

I mean we try to micro batch and stream everything below seconds but when
it comes to help  monitor basics we are still miles behind :(

Cheers,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 26 August 2016 at 23:21, Jacek Laskowski  wrote:

> Hi Mich,
>
> I don't think so. There is support for a UI page refresh but I haven't
> seen it in use.
>
> See StreamingPage [1] where it schedules refresh every 5 secs, i.e.
> Some(5000). In SparkUIUtils.headerSparkPage [2] there is
> refreshInterval but it's not used in any place in Spark.
>
> Time to fill an JIRA issue?
>
> What about REST API and httpie updating regularly [3]? Perhaps Metrics
> with ConsoleSink [4]?
>
> [1] https://github.com/apache/spark/blob/master/streaming/
> src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L158
> [2] https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/ui/UIUtils.scala#L202
> [3] http://spark.apache.org/docs/latest/monitoring.html#rest-api
> [4] http://spark.apache.org/docs/latest/monitoring.html#metrics
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Aug 25, 2016 at 11:55 AM, Mich Talebzadeh
>  wrote:
> > Hi,
> >
> > This may be already there.
> >
> > A spark job opens up a UI on port specified by --conf
> "spark.ui.port=${SP}"
> > that defaults to 4040.
> >
> > However, on UI one needs to refresh the page to see the progress.
> >
> > Can this be polled so it is refreshed automatically
> >
> > Thanks
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > Disclaimer: Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> The
> > author will in no case be liable for any monetary damages arising from
> such
> > loss, damage or destruction.
> >
> >
>


Re: Is there anyway Spark UI is set to poll and refreshes itself

2016-08-26 Thread Jacek Laskowski
Hi Mich,

I don't think so. There is support for a UI page refresh but I haven't
seen it in use.

See StreamingPage [1] where it schedules refresh every 5 secs, i.e.
Some(5000). In SparkUIUtils.headerSparkPage [2] there is
refreshInterval but it's not used in any place in Spark.

Time to fill an JIRA issue?

What about REST API and httpie updating regularly [3]? Perhaps Metrics
with ConsoleSink [4]?

[1] 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L158
[2] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/UIUtils.scala#L202
[3] http://spark.apache.org/docs/latest/monitoring.html#rest-api
[4] http://spark.apache.org/docs/latest/monitoring.html#metrics

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Aug 25, 2016 at 11:55 AM, Mich Talebzadeh
 wrote:
> Hi,
>
> This may be already there.
>
> A spark job opens up a UI on port specified by --conf "spark.ui.port=${SP}"
> that defaults to 4040.
>
> However, on UI one needs to refresh the page to see the progress.
>
> Can this be polled so it is refreshed automatically
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Please assist: Building Docker image containing spark 2.0

2016-08-26 Thread Michael Gummelt
Run with "-X -e" like the error message says. See what comes out.

On Fri, Aug 26, 2016 at 2:23 PM, Tal Grynbaum 
wrote:

> Did you specify -Dscala-2.10
> As in
> ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4
> -Dscala-2.10 -DskipTests clean package
> If you're building with scala 2.10
>
> On Sat, Aug 27, 2016, 00:18 Marco Mistroni  wrote:
>
>> Hello Michael
>> uhm i celebrated too soon
>> Compilation of spark on docker image went near the end and then it
>> errored out with this message
>>
>> INFO] BUILD FAILURE
>> [INFO] 
>> 
>> [INFO] Total time: 01:01 h
>> [INFO] Finished at: 2016-08-26T21:12:25+00:00
>> [INFO] Final Memory: 69M/324M
>> [INFO] 
>> 
>> [ERROR] Failed to execute goal 
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile
>> (scala-compile-first) on project spark-mllib_2.11: Execution
>> scala-compile-first of goal 
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile
>> failed. CompileFailed -> [Help 1]
>> [ERROR]
>> [ERROR] To see the full stack trace of the errors, re-run Maven with the
>> -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions,
>> please read the following articles:
>> [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/
>> PluginExecutionException
>> [ERROR]
>> [ERROR] After correcting the problems, you can resume the build with the
>> command
>> [ERROR]   mvn  -rf :spark-mllib_2.11
>> The command '/bin/sh -c ./build/mvn -Pyarn -Phadoop-2.4
>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero code:
>> 1
>>
>> what am i forgetting?
>> once again, last command i launched on the docker file is
>>
>>
>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
>> clean package
>>
>> kr
>>
>>
>>
>> On Fri, Aug 26, 2016 at 6:18 PM, Michael Gummelt 
>> wrote:
>>
>>> :)
>>>
>>> On Thu, Aug 25, 2016 at 2:29 PM, Marco Mistroni 
>>> wrote:
>>>
 No i wont accept that :)
 I can't believe i have wasted 3 hrs for a space!

 Many thanks MIchael!

 kr

 On Thu, Aug 25, 2016 at 10:01 PM, Michael Gummelt <
 mgumm...@mesosphere.io> wrote:

> You have a space between "build" and "mvn"
>
> On Thu, Aug 25, 2016 at 1:31 PM, Marco Mistroni 
> wrote:
>
>> HI all
>>  sorry for the partially off-topic, i hope there's someone on the
>> list who has tried the same and encountered similar issuse
>>
>> Ok so i have created a Docker file to build an ubuntu container which
>> inlcudes spark 2.0, but somehow when it gets to the point where it has to
>> kick off  ./build/mvn command, it errors out with the following
>>
>> ---> Running in 8c2aa6d59842
>> /bin/sh: 1: ./build: Permission denied
>> The command '/bin/sh -c ./build mvn -Pyarn -Phadoop-2.4
>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero 
>> code:
>> 126
>>
>> I am puzzled as i am root when i build the container, so i should not
>> encounter this issue (btw, if instead of running mvn from the build
>> directory  i use the mvn which i installed on the container, it works 
>> fine
>> but it's  painfully slow)
>>
>> here are the details of my Spark command( scala 2.10, java 1.7 , mvn
>> 3.3.9 and git have already been installed)
>>
>> # Spark
>> RUN echo "Installing Apache spark 2.0"
>> RUN git clone git://github.com/apache/spark.git
>> WORKDIR /spark
>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0
>> -DskipTests clean package
>>
>>
>> Could anyone assist pls?
>>
>> kindest regarsd
>>  Marco
>>
>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


>>>
>>>
>>> --
>>> Michael Gummelt
>>> Software Engineer
>>> Mesosphere
>>>
>>
>>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: is there a HTTP2 (v2) endpoint for Spark Streaming?

2016-08-26 Thread kant kodali

What's happening to my English too many Typo's sorry. Let me rephrase it
HTTP2 for fully pipelined out of order execution. other words I should be able
to send multiple requests through same TCP connection and by out of order
execution I mean if I send Req1 at t1 and Req2 at t2 where t1 < t2 and if Req 2
finishes before Req1 I should be able to get a response from Req2 first and then
a response from Req 1 even though Req1 was sent earlier but say it just took
longer.


Having a HTTP receiver for spark would be great because it is just very common
these days since lot of services these days communicate using HTTP. HTTP2 for
better performance.





On Fri, Aug 26, 2016 2:47 PM, kant kodali kanth...@gmail.com wrote:
HTTP2 for fully pipelined out of order execution. other words I should be able
to send multiple requests through same TCP connection and by out of order
execution I mean if I send Req1 at t1 and Req2 at t2 where t1 < t2 and if Req 2
finishes before Req1 I should be able to get a response from Req2 first and then
a response from Req 1 even though Req1 was sent earlier but say it just took
longer.
Having a HTTP receiver for spark would be great because it is just very common
these days since that of services these communicate using HTTP. HTTP2 for better
performance.





On Fri, Aug 26, 2016 2:20 PM, Jacek Laskowski ja...@japila.pl wrote:
Hi,
Never heard of one myself. I don't think Bahir [1] offers it, either. Perhaps
socketTextStream or textFileStream with http URI could be of some help?
What would you expect from such a HTTP/2 receiver? What are the requirements?
Why http/2? #curious
[1] http://bahir.apache.org/

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski
On Fri, Aug 26, 2016 at 9:42 PM, kant kodali < kanth...@gmail.com > wrote:
is there a HTTP2 (v2) endpoint for Spark Streaming?

Re: Reading parquet files into Spark Streaming

2016-08-26 Thread Renato Marroquín Mogrovejo
Anybody? I think Rory also didn't get an answer from the list ...

https://mail-archives.apache.org/mod_mbox/spark-user/201602.mbox/%3ccac+fre14pv5nvqhtbvqdc+6dkxo73odazfqslbso8f94ozo...@mail.gmail.com%3E



2016-08-26 17:42 GMT+02:00 Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com>:

> Hi all,
>
> I am trying to use parquet files as input for DStream operations, but I
> can't find any documentation or example. The only thing I found was [1] but
> I also get the same error as in the post (Class
> parquet.avro.AvroReadSupport not found).
> Ideally I would like to do have something like this:
>
> val oDStream = ssc.fileStream[Void, Order, ParquetInputFormat[Order]]("
> data/")
>
> where Order is a case class and the files inside "data" are all parquet
> files.
> Any hints would be highly appreciated. Thanks!
>
>
> Best,
>
> Renato M.
>
> [1] http://stackoverflow.com/questions/35413552/how-do-i-
> read-in-parquet-files-using-ssc-filestream-and-what-is-the-nature
>


Re: is there a HTTP2 (v2) endpoint for Spark Streaming?

2016-08-26 Thread kant kodali

HTTP2 for fully pipelined out of order execution. other words I should be able
to send multiple requests through same TCP connection and by out of order
execution I mean if I send Req1 at t1 and Req2 at t2 where t1 < t2 and if Req 2
finishes before Req1 I should be able to get a response from Req2 first and then
a response from Req 1 even though Req1 was sent earlier but say it just took
longer.
Having a HTTP receiver for spark would be great because it is just very common
these days since that of services these communicate using HTTP. HTTP2 for better
performance.





On Fri, Aug 26, 2016 2:20 PM, Jacek Laskowski ja...@japila.pl wrote:
Hi,
Never heard of one myself. I don't think Bahir [1] offers it, either. Perhaps
socketTextStream or textFileStream with http URI could be of some help?
What would you expect from such a HTTP/2 receiver? What are the requirements?
Why http/2? #curious
[1] http://bahir.apache.org/

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski
On Fri, Aug 26, 2016 at 9:42 PM, kant kodali < kanth...@gmail.com > wrote:
is there a HTTP2 (v2) endpoint for Spark Streaming?

RE: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Subhajit Purkayastha
So the data in the fcst dataframe is like this

 

Product, fcst_qty

A 100

B 50

 

Sales DF has data like this

 

Order# Item#Sales qty

101 A 10

101 B 5

102 A 5

102 B 10

 

I want to update the FCSt DF data, based on Product=Item#

 

So the resultant FCST DF should have data

Product, fcst_qty

A 85

B 35

 

Hope it helps

 

If I join the data between the 2 DFs (based on Product# and item#), I will get 
a cartesion join and my result will not be what I want

 

Thanks for your help

 

 

From: Mike Metzger [mailto:m...@flexiblecreations.com] 
Sent: Friday, August 26, 2016 2:12 PM
To: Subhajit Purkayastha 
Cc: user @spark 
Subject: Re: Spark 2.0 - Insert/Update to a DataFrame

 

Without seeing exactly what you were wanting to accomplish, it's hard to say.  
A Join is still probably the method I'd suggest using something like:

 

select (FCST.quantity - SO.quantity) as quantity



from FCST

LEFT OUTER JOIN

SO ON FCST.productid = SO.productid

WHERE



 

with specifics depending on the layout and what language you're using.

 

Thanks

 

Mike

 

On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha  > wrote:

Mike,

 

The grains of the dataFrame are different.

 

I need to reduce the forecast qty (which is in the FCST DF)  based on the sales 
qty (coming from the sales  order DF)

 

Hope it helps

 

Subhajit

 

From: Mike Metzger [mailto:m...@flexiblecreations.com 
 ] 
Sent: Friday, August 26, 2016 1:13 PM
To: Subhajit Purkayastha  >
Cc: user @spark  >
Subject: Re: Spark 2.0 - Insert/Update to a DataFrame

 

Without seeing the makeup of the Dataframes nor what your logic is for updating 
them, I'd suggest doing a join of the Forecast DF with the appropriate columns 
from the SalesOrder DF.  

 

Mike

 

On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha  > wrote:

I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to 
update the Forecast Dataframe record(s), based on the SaleOrder DF record. What 
is the best way to achieve this functionality

 

 



Re: Please assist: Building Docker image containing spark 2.0

2016-08-26 Thread Tal Grynbaum
Did you specify -Dscala-2.10
As in
./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4
-Dscala-2.10 -DskipTests clean package
If you're building with scala 2.10

On Sat, Aug 27, 2016, 00:18 Marco Mistroni  wrote:

> Hello Michael
> uhm i celebrated too soon
> Compilation of spark on docker image went near the end and then it errored
> out with this message
>
> INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 01:01 h
> [INFO] Finished at: 2016-08-26T21:12:25+00:00
> [INFO] Final Memory: 69M/324M
> [INFO]
> 
> [ERROR] Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
> on project spark-mllib_2.11: Execution scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
> -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :spark-mllib_2.11
> The command '/bin/sh -c ./build/mvn -Pyarn -Phadoop-2.4
> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero code:
> 1
>
> what am i forgetting?
> once again, last command i launched on the docker file is
>
>
> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
> clean package
>
> kr
>
>
>
> On Fri, Aug 26, 2016 at 6:18 PM, Michael Gummelt 
> wrote:
>
>> :)
>>
>> On Thu, Aug 25, 2016 at 2:29 PM, Marco Mistroni 
>> wrote:
>>
>>> No i wont accept that :)
>>> I can't believe i have wasted 3 hrs for a space!
>>>
>>> Many thanks MIchael!
>>>
>>> kr
>>>
>>> On Thu, Aug 25, 2016 at 10:01 PM, Michael Gummelt <
>>> mgumm...@mesosphere.io> wrote:
>>>
 You have a space between "build" and "mvn"

 On Thu, Aug 25, 2016 at 1:31 PM, Marco Mistroni 
 wrote:

> HI all
>  sorry for the partially off-topic, i hope there's someone on the list
> who has tried the same and encountered similar issuse
>
> Ok so i have created a Docker file to build an ubuntu container which
> inlcudes spark 2.0, but somehow when it gets to the point where it has to
> kick off  ./build/mvn command, it errors out with the following
>
> ---> Running in 8c2aa6d59842
> /bin/sh: 1: ./build: Permission denied
> The command '/bin/sh -c ./build mvn -Pyarn -Phadoop-2.4
> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero 
> code:
> 126
>
> I am puzzled as i am root when i build the container, so i should not
> encounter this issue (btw, if instead of running mvn from the build
> directory  i use the mvn which i installed on the container, it works fine
> but it's  painfully slow)
>
> here are the details of my Spark command( scala 2.10, java 1.7 , mvn
> 3.3.9 and git have already been installed)
>
> # Spark
> RUN echo "Installing Apache spark 2.0"
> RUN git clone git://github.com/apache/spark.git
> WORKDIR /spark
> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
> clean package
>
>
> Could anyone assist pls?
>
> kindest regarsd
>  Marco
>
>


 --
 Michael Gummelt
 Software Engineer
 Mesosphere

>>>
>>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>>
>
>


Re: is there a HTTP2 (v2) endpoint for Spark Streaming?

2016-08-26 Thread Jacek Laskowski
Hi,

Never heard of one myself. I don't think Bahir [1] offers it, either.
Perhaps socketTextStream or textFileStream with http URI could be of some
help?

What would you expect from such a HTTP/2 receiver? What are the
requirements? Why http/2? #curious

[1] http://bahir.apache.org/


Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Fri, Aug 26, 2016 at 9:42 PM, kant kodali  wrote:

> is there a HTTP2 (v2) endpoint for Spark Streaming?
>


Re: Please assist: Building Docker image containing spark 2.0

2016-08-26 Thread Marco Mistroni
Hello Michael
uhm i celebrated too soon
Compilation of spark on docker image went near the end and then it errored
out with this message

INFO] BUILD FAILURE
[INFO]

[INFO] Total time: 01:01 h
[INFO] Finished at: 2016-08-26T21:12:25+00:00
[INFO] Final Memory: 69M/324M
[INFO]

[ERROR] Failed to execute goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
on project spark-mllib_2.11: Execution scala-compile-first of goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
-> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions,
please read the following articles:
[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the
command
[ERROR]   mvn  -rf :spark-mllib_2.11
The command '/bin/sh -c ./build/mvn -Pyarn -Phadoop-2.4
-Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero code:
1

what am i forgetting?
once again, last command i launched on the docker file is

RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
clean package

kr



On Fri, Aug 26, 2016 at 6:18 PM, Michael Gummelt 
wrote:

> :)
>
> On Thu, Aug 25, 2016 at 2:29 PM, Marco Mistroni 
> wrote:
>
>> No i wont accept that :)
>> I can't believe i have wasted 3 hrs for a space!
>>
>> Many thanks MIchael!
>>
>> kr
>>
>> On Thu, Aug 25, 2016 at 10:01 PM, Michael Gummelt > > wrote:
>>
>>> You have a space between "build" and "mvn"
>>>
>>> On Thu, Aug 25, 2016 at 1:31 PM, Marco Mistroni 
>>> wrote:
>>>
 HI all
  sorry for the partially off-topic, i hope there's someone on the list
 who has tried the same and encountered similar issuse

 Ok so i have created a Docker file to build an ubuntu container which
 inlcudes spark 2.0, but somehow when it gets to the point where it has to
 kick off  ./build/mvn command, it errors out with the following

 ---> Running in 8c2aa6d59842
 /bin/sh: 1: ./build: Permission denied
 The command '/bin/sh -c ./build mvn -Pyarn -Phadoop-2.4
 -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero code:
 126

 I am puzzled as i am root when i build the container, so i should not
 encounter this issue (btw, if instead of running mvn from the build
 directory  i use the mvn which i installed on the container, it works fine
 but it's  painfully slow)

 here are the details of my Spark command( scala 2.10, java 1.7 , mvn
 3.3.9 and git have already been installed)

 # Spark
 RUN echo "Installing Apache spark 2.0"
 RUN git clone git://github.com/apache/spark.git
 WORKDIR /spark
 RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
 clean package


 Could anyone assist pls?

 kindest regarsd
  Marco


>>>
>>>
>>> --
>>> Michael Gummelt
>>> Software Engineer
>>> Mesosphere
>>>
>>
>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


Re: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Mike Metzger
Without seeing exactly what you were wanting to accomplish, it's hard to
say.  A Join is still probably the method I'd suggest using something like:

select (FCST.quantity - SO.quantity) as quantity

from FCST
LEFT OUTER JOIN
SO ON FCST.productid = SO.productid
WHERE


with specifics depending on the layout and what language you're using.

Thanks

Mike

On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha 
wrote:

> Mike,
>
>
>
> The grains of the dataFrame are different.
>
>
>
> I need to reduce the forecast qty (which is in the FCST DF)  based on the
> sales qty (coming from the sales  order DF)
>
>
>
> Hope it helps
>
>
>
> Subhajit
>
>
>
> *From:* Mike Metzger [mailto:m...@flexiblecreations.com]
> *Sent:* Friday, August 26, 2016 1:13 PM
> *To:* Subhajit Purkayastha 
> *Cc:* user @spark 
> *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame
>
>
>
> Without seeing the makeup of the Dataframes nor what your logic is for
> updating them, I'd suggest doing a join of the Forecast DF with the
> appropriate columns from the SalesOrder DF.
>
>
>
> Mike
>
>
>
> On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha 
> wrote:
>
> I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need
> to update the Forecast Dataframe record(s), based on the SaleOrder DF
> record. What is the best way to achieve this functionality
>
>
>


Re: Spark 1.6 Streaming with Checkpointing

2016-08-26 Thread Jacek Laskowski
On Fri, Aug 26, 2016 at 10:54 PM, Benjamin Kim  wrote:

> // Create a text file stream on an S3 bucket
> val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")
>
> csv.foreachRDD(rdd => {
> if (!rdd.partitions.isEmpty) {
> // process data
> }
> })

Hi Benjamin,

I hardly remember the case now but I'd recommend to move the above
snippet *after* you getOrCreate context, i.e.

> // Get streaming context from checkpoint data or create a new one
> val context = StreamingContext.getOrCreate(checkpoint,
> () => createContext(interval, checkpoint, bucket, database, 
> table, partitionBy))

Please the first snippet here so you only create the pipeline after
you get context.

I might be mistaken, too. Sorry.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 1.6 Streaming with Checkpointing

2016-08-26 Thread Benjamin Kim
I am trying to implement checkpointing in my streaming application but I am 
getting a not serializable error. Has anyone encountered this? I am deploying 
this job in YARN clustered mode.

Here is a snippet of the main parts of the code.

object S3EventIngestion {
//create and setup streaming context
def createContext(
batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: 
String, databaseName: String, tableName: String, partitionByColumnName: String
): StreamingContext = {

println("Creating new context")
val sparkConf = new SparkConf().setAppName("S3EventIngestion")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

// Create the streaming context with batch interval
val ssc = new StreamingContext(sc, Seconds(batchInterval))

// Create a text file stream on an S3 bucket
val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")

csv.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
// process data
}
})

ssc.checkpoint(checkpointDirectory)
ssc
}

def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("Usage: S3EventIngestion  

")
System.exit(1)
}

// Get streaming context from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpoint,
() => createContext(interval, checkpoint, bucket, database, table, 
partitionBy))

//start streaming context
context.start()
context.awaitTermination()
}
}

Can someone help please?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Subhajit Purkayastha
Mike,

 

The grains of the dataFrame are different.

 

I need to reduce the forecast qty (which is in the FCST DF)  based on the sales 
qty (coming from the sales  order DF)

 

Hope it helps

 

Subhajit

 

From: Mike Metzger [mailto:m...@flexiblecreations.com] 
Sent: Friday, August 26, 2016 1:13 PM
To: Subhajit Purkayastha 
Cc: user @spark 
Subject: Re: Spark 2.0 - Insert/Update to a DataFrame

 

Without seeing the makeup of the Dataframes nor what your logic is for updating 
them, I'd suggest doing a join of the Forecast DF with the appropriate columns 
from the SalesOrder DF.  

 

Mike

 

On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha  > wrote:

I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to 
update the Forecast Dataframe record(s), based on the SaleOrder DF record. What 
is the best way to achieve this functionality

 



Re: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Mike Metzger
Without seeing the makeup of the Dataframes nor what your logic is for
updating them, I'd suggest doing a join of the Forecast DF with the
appropriate columns from the SalesOrder DF.

Mike

On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha 
wrote:

> I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need
> to update the Forecast Dataframe record(s), based on the SaleOrder DF
> record. What is the best way to achieve this functionality
>


is there a HTTP2 (v2) endpoint for Spark Streaming?

2016-08-26 Thread kant kodali
is there a HTTP2 (v2) endpoint for Spark Streaming?

Re: unable to start slaves from master (SSH problem)

2016-08-26 Thread kant kodali

Fixed..I just had to logout and login the master node for some reason





On Fri, Aug 26, 2016 5:32 AM, kant kodali kanth...@gmail.com wrote:
Hi,
I am unable to start spark slaves from my master node. when I run ./start-all.sh
in my master node it brings up the master and but fails for slaves saying
"permission denied public key" for slaves but I did add the master id_rsa.pub to
my slaves authorized_keys and I checked manually from my master node that I was
able to ssh into my slave node without any password. anything else I am missing?
Thanks!

Re: spark 2.0 home brew package missing

2016-08-26 Thread RAJESHWAR MANN
Thank you! That was it. 2.0 installed fine after the update.

Regards


> On Aug 26, 2016, at 1:37 PM, Noorul Islam K M  wrote:
> 
> kalkimann  writes:
> 
>> Hi,
>> spark 1.6.2 is the latest brew package i can find.
>> spark 2.0.x brew package is missing, best i know.
>> 
>> Is there a schedule when spark-2.0 will be available for "brew install"? 
>> 
> 
> Did you do a 'brew update' before searching. I installed spark-2.0 this
> week.
> 
> Regards
> Noorul


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: EMR for spark job - instance type suggestion

2016-08-26 Thread Gavin Yue
I tried both M4 and R3.  R3 is slightly more expensive, but has larger
memory.

If you doing a lot of in-memory staff, like Join.   I recommend R3.

Otherwise M4 is fine.  Also I remember M4 is EBS instance, so you have to
pay for additional EBS cost as well.



On Fri, Aug 26, 2016 at 10:29 AM, Saurabh Malviya (samalviy) <
samal...@cisco.com> wrote:

> We are going to use EMR cluster for spark jobs in aws. Any suggestion for
> instance type to be used.
>
>
>
> M3.xlarge or r3.xlarge.
>
>
>
> Details:
>
> 1)  We are going to run couple of streaming jobs so we need on demand
> instance type.
>
> 2)  There is no data on hdfs/s3 all data pull from kafka or elastic
> search
>
>
>
>
>
> -Saurabh
>


Re: spark 2.0 home brew package missing

2016-08-26 Thread Noorul Islam K M
kalkimann  writes:

> Hi,
> spark 1.6.2 is the latest brew package i can find.
> spark 2.0.x brew package is missing, best i know.
>
> Is there a schedule when spark-2.0 will be available for "brew install"? 
>

Did you do a 'brew update' before searching. I installed spark-2.0 this
week.

Regards
Noorul

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



EMR for spark job - instance type suggestion

2016-08-26 Thread Saurabh Malviya (samalviy)
We are going to use EMR cluster for spark jobs in aws. Any suggestion for 
instance type to be used.

M3.xlarge or r3.xlarge.

Details:

1)  We are going to run couple of streaming jobs so we need on demand 
instance type.

2)  There is no data on hdfs/s3 all data pull from kafka or elastic search


-Saurabh


spark 2.0 home brew package missing

2016-08-26 Thread kalkimann
Hi,
spark 1.6.2 is the latest brew package i can find.
spark 2.0.x brew package is missing, best i know.

Is there a schedule when spark-2.0 will be available for "brew install"? 

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-2-0-home-brew-package-missing-tp27608.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Populating tables using hive and spark

2016-08-26 Thread Timur Shenkao
Hello!

I just wonder: do you (both of you) use the same user for HIVE & Spark? Or
different ? Do you use Kerberized Hadoop?



On Mon, Aug 22, 2016 at 2:20 PM, Mich Talebzadeh 
wrote:

> Ok This is my test
>
> 1) create table in Hive and populate it with two rows
>
> hive> create table testme (col1 int, col2 string);
> OK
> hive> insert into testme values (1,'London');
> Query ID = hduser_20160821212812_2a8384af-23f1-4f28-9395-a99a5f4c1a4a
> OK
> hive> insert into testme values (2,'NY');
> Query ID = hduser_20160821212812_2a8384af-23f1-4f28-9395-a99a5f4c1a4a
> OK
> hive> select * from testme;
> OK
> 1   London
> 2   NY
>
> So the rows are there
>
> Now use  Spark to create two more rows
>
> scala> case class columns (col1: Int, col2: String)
> defined class columns
> scala> val df =sc.parallelize(Array((3,"California"),(4,"Dehli"))).map(p
> => columns(p._1.toString.toInt, p._2.toString)).toDF()
> df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
> scala> df.show
> ++--+
> |col1|  col2|
> ++--+
> |   3|California|
> |   4| Dehli|
> ++--+
>
> // register it as tempTable
> scala> df.registerTempTable("tmp")
> scala> sql("insert into test.testme select * from tmp")
> res9: org.apache.spark.sql.DataFrame = []
> scala> sql("select * from testme").show
> ++--+
> |col1|  col2|
> ++--+
> |   1|London|
> |   2|NY|
> |   3|California|
> |   4| Dehli|
> ++--+
> So the rows are there.
>
> Let me go to Hive again now
>
>
> hive>  select * from testme;
> OK
> 1   London
> 2   NY
> 3   California
> 4   Dehli
>
> hive> analyze table testme compute statistics for columns;
>
> So is there any issue here?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 22 August 2016 at 11:51, Nitin Kumar  wrote:
>
>> Hi Furcy,
>>
>> If I execute the command "ANALYZE TABLE TEST_ORC COMPUTE STATISTICS"
>> before checking the count from hive, Hive returns the correct count albeit
>> it does not spawn a map-reduce job for computing the count.
>>
>> I'm running a HDP 2.4 Cluster with Hive 1.2.1.2.4 and Spark 1.6.1
>>
>> If others can concur we can go ahead and report it as a bug.
>>
>> Regards,
>> Nitin
>>
>>
>>
>> On Mon, Aug 22, 2016 at 4:15 PM, Furcy Pin 
>> wrote:
>>
>>> Hi Nitin,
>>>
>>> I confirm that there is something odd here.
>>>
>>> I did the following test :
>>>
>>> create table test_orc (id int, name string, dept string) stored as ORC;
>>> insert into table test_orc values (1, 'abc', 'xyz');
>>> insert into table test_orc values (2, 'def', 'xyz');
>>> insert into table test_orc values (3, 'pqr', 'xyz');
>>> insert into table test_orc values (4, 'ghi', 'xyz');
>>>
>>>
>>> I ended up with 4 files on hdfs:
>>>
>>> 00_0
>>> 00_0_copy_1
>>> 00_0_copy_2
>>> 00_0_copy_3
>>>
>>>
>>> Then I renamed 00_0_copy_2 to part-0, and I still got COUNT(*) =
>>> 4 with hive.
>>> So this is not a file name issue.
>>>
>>> I then removed one of the files, and I got this :
>>>
>>> > SELECT COUNT(1) FROM test_orc ;
>>> +--+--+
>>> | _c0  |
>>> +--+--+
>>> | 4|
>>> +--+--+
>>>
>>> > SELECT * FROM test_orc ;
>>> +--+++--+
>>> | test_orc.id  | test_orc.name  | test_orc.dept  |
>>> +--+++--+
>>> | 1| abc| xyz|
>>> | 2| def| xyz|
>>> | 4| ghi| xyz|
>>> +--+++--+
>>> 3 rows selected (0.162 seconds)
>>>
>>> So, my guess is that when Hive inserts data, it must keep somewhere in
>>> the metastore the number of rows in the table.
>>> However, if the files are modified by someone else than Hive itself,
>>> (either manually or with Spark), you end up with an inconsistency.
>>>
>>> So I guess we can call it a bug:
>>>
>>> Hive should detect that the files changed and invalidate its
>>> pre-calculated count.
>>> Optionally, Spark should be nice with Hive and update the the count when
>>> inserting.
>>>
>>> I don't know if this bug has already been reported, and I tested on Hive
>>> 1.1.0, so perhaps it is already solved in later releases.
>>>
>>> Regards,
>>>
>>> Furcy
>>>
>>>
>>> On Mon, Aug 22, 2016 at 9:34 AM, Nitin 

Re: Please assist: Building Docker image containing spark 2.0

2016-08-26 Thread Michael Gummelt
:)

On Thu, Aug 25, 2016 at 2:29 PM, Marco Mistroni  wrote:

> No i wont accept that :)
> I can't believe i have wasted 3 hrs for a space!
>
> Many thanks MIchael!
>
> kr
>
> On Thu, Aug 25, 2016 at 10:01 PM, Michael Gummelt 
> wrote:
>
>> You have a space between "build" and "mvn"
>>
>> On Thu, Aug 25, 2016 at 1:31 PM, Marco Mistroni 
>> wrote:
>>
>>> HI all
>>>  sorry for the partially off-topic, i hope there's someone on the list
>>> who has tried the same and encountered similar issuse
>>>
>>> Ok so i have created a Docker file to build an ubuntu container which
>>> inlcudes spark 2.0, but somehow when it gets to the point where it has to
>>> kick off  ./build/mvn command, it errors out with the following
>>>
>>> ---> Running in 8c2aa6d59842
>>> /bin/sh: 1: ./build: Permission denied
>>> The command '/bin/sh -c ./build mvn -Pyarn -Phadoop-2.4
>>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero code:
>>> 126
>>>
>>> I am puzzled as i am root when i build the container, so i should not
>>> encounter this issue (btw, if instead of running mvn from the build
>>> directory  i use the mvn which i installed on the container, it works fine
>>> but it's  painfully slow)
>>>
>>> here are the details of my Spark command( scala 2.10, java 1.7 , mvn
>>> 3.3.9 and git have already been installed)
>>>
>>> # Spark
>>> RUN echo "Installing Apache spark 2.0"
>>> RUN git clone git://github.com/apache/spark.git
>>> WORKDIR /spark
>>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
>>> clean package
>>>
>>>
>>> Could anyone assist pls?
>>>
>>> kindest regarsd
>>>  Marco
>>>
>>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>>
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: mutable.LinkedHashMap kryo serialization issues

2016-08-26 Thread Rahul Palamuttam
Thanks Renato.

I forgot to reply all last time. I apologize for the rather confusing
example.
All that the snipet code did was
1. Make an RDD of LinkedHashMaps with size 2
2. On the worker side get the sizes of the HashMaps (via a map(hash =>
hash.size))
3. On the driver call collect on the RDD[Ints] which is the RDD of hashmap
sizes giving you an Array[Ints]
4. On the driver call collect on the RDD[LinkedHashMap] giving you an
Array[LinkedHashMap]
5. Check the size of a hashmap in Array[LinkedHashMap] with any size value
in Array[Ints] (they're all going to be the same size).
6. The sizes differ because the elements of the LinkedHashMap were never
copied over

Anyway I think I've tracked down the issue and it doesn't seem to be a
spark or kryo issue.

For those it concerns LinkedHashMap has this serialization issue because it
has transient members for firstEntry and lastEntry.
Take a look here :
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/mutable/LinkedHashMap.scala#L62

Those attributes are not going to be serialized.
Furthermore, the iterator on LinkedHashMap depends on the firstEntry
variable
Since that member is not serialized it is null.
The iterator requires the firstEntry variable to walk the LinkedHashMap
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/mutable/LinkedHashMap.scala#L94-L100

I wonder why these two variables were made transient.

Best,
Rahul Palamuttam


On Thu, Aug 25, 2016 at 11:13 PM, Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> Hi Rahul,
>
> You have probably already figured this one out, but anyway...
> You need to register the classes that you'll be using with Kryo because it
> does not support all Serializable types and requires you to register the
> classes you’ll use in the program in advance. So when you don't register
> the class, Kryo doesn't know how to serialize/deserialize it.
>
>
> Best,
>
> Renato M.
>
> 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam :
>
>> Hi,
>>
>> Just sending this again to see if others have had this issue.
>>
>> I recently switched to using kryo serialization and I've been running
>> into errors
>> with the mutable.LinkedHashMap class.
>>
>> If I don't register the mutable.LinkedHashMap class then I get an
>> ArrayStoreException seen below.
>> If I do register the class, then when the LinkedHashMap is collected on
>> the driver, it does not contain any elements.
>>
>> Here is the snippet of code I used :
>>
>> val sc = new SparkContext(new SparkConf()
>>   .setMaster("local[*]")
>>   .setAppName("Sample")
>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>   .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, 
>> String]])))
>>
>> val collect = sc.parallelize(0 to 10)
>>   .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
>> "bonjour"), ("good", "bueno")))
>>
>> val mapSideSizes = collect.map(p => p.size).collect()(0)
>> val driverSideSizes = collect.collect()(0).size
>>
>> println("The sizes before collect : " + mapSideSizes)
>> println("The sizes after collect : " + driverSideSizes)
>>
>>
>> ** The following only occurs if I did not register the
>> mutable.LinkedHashMap class **
>> 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task
>> result
>> java.lang.ArrayStoreException: scala.collection.mutable.HashMap
>> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer
>> s$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer
>> s$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>> at org.apache.spark.serializer.KryoSerializerInstance.deseriali
>> ze(KryoSerializer.scala:311)
>> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply$mcV$sp(TaskResultGetter.scala:60)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply(TaskResultGetter.scala:51)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply(TaskResultGetter.scala:51)
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(Task
>> ResultGetter.scala:50)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I hope this is a known issue and/or I'm missing something important in my
>> setup.
>> Appreciate any help or advice!
>>
>> Best,
>>
>> Rahul Palamuttam
>>
>
>


Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Subhajit Purkayastha
I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to
update the Forecast Dataframe record(s), based on the SaleOrder DF record.
What is the best way to achieve this functionality



Re: What do I loose if I run spark without using HDFS or Zookeeper?

2016-08-26 Thread Steve Loughran

On 26 Aug 2016, at 12:58, kant kodali 
> wrote:

@Steve your arguments make sense however there is a good majority of people who 
have extensive experience with zookeeper prefer to avoid zookeeper and given 
the ease of consul (which btw uses raft for the election) and etcd lot of us 
are more inclined to avoid ZK.

And yes any technology needs time for maturity but that said it shouldn't stop 
us from transitioning. for example people started using spark when it first 
released instead of waiting for spark 2.0 where there are lot of optimizations 
and bug fixes.



One way to look at the problem is "what is the cost if something doesn't work?"

If it's some HA consensus system, failure modes are "consensus failure, 
everything goes into minority mode and offline". service lost, data fine. 
Another  is "partition with both groups thinking they are in charge", which is 
more dangerous. then there's "partitioning event not detected", which may be 
bad.

so: consider the failure modes and then consider not so much whether the tech 
you are using is vulnerable to it, but "if it goes wrong, does it matter?"


Even before HDFS had HA with ZK/bookkeeper it didn't fail very often. And if 
you looked at the causes of those failures, things like backbone switch failure 
are so traumatic that things like ZK/etcd failures aren't going to make much of 
a difference. The filesystem is down.

Generally, integrity gets priority over availability. That said, S3 and the 
like have put availability ahead of consistency; Cassandra can offer that 
too.—sometimes it is the right strategy



Spark driver memory breakdown

2016-08-26 Thread Mich Talebzadeh
Hi,

I alwayd underestimated the significant of setting spark.driver.memory

According to documents

It is the amount of memory to use for the driver process, i.e. where
SparkContext is initialized. (e.g. 1g, 2g).

I was running my application using Spark Standalone so the argument about
Local mode and one JVM do not come into it.

As I know:

* The driver program is the main program, which coordinates the executors
to run the Spark application.*

It is not clear to me whether the driver program also allocates the memory
to executors that run on workers.

I noticed that if you leave this driver memory low you end up with heap
space issue and the job crashes. So I had to increase the driver memory
from 1G to 8G to make the job run.

So in a nutshell how this driver memory is allocated in Standalone mode
given that we also have executer memory --executor-memory that I set
separately.



Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: zookeeper mesos logging in spark

2016-08-26 Thread Michael Gummelt
These are the libmesos logs.  Maybe look here
http://mesos.apache.org/documentation/latest/logging/

On Fri, Aug 26, 2016 at 8:31 AM, aecc  wrote:

> Hi,
>
> Everytime I run my spark application using mesos, I get logs in my console
> in the form:
>
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
> I0826 15:25:30.949254 960752 sched.cpp:222] Version: 0.28.2
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
> 2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@zookeep
> 2016-08-26 15:25:30,951:960521(0x7f6bb4ff9700):ZOO_INFO@check_e
> 2016-08-26 15:25:30,952:960521(0x7f6bb4ff9700):ZOO_INFO@check_e
> I0826 15:25:30.952505 960729 group.cpp:349] Group process (grou
> I0826 15:25:30.952570 960729 group.cpp:831] Syncing group opera
> I0826 15:25:30.952592 960729 group.cpp:427] Trying to create pa
> I0826 15:25:30.954211 960722 detector.cpp:152] Detected a new l
> I0826 15:25:30.954320 960744 group.cpp:700] Trying to get '/mes
> I0826 15:25:30.955345 960724 detector.cpp:479] A new leading ma
> I0826 15:25:30.955451 960724 sched.cpp:326] New master detected
> I0826 15:25:30.955567 960724 sched.cpp:336] No credentials prov
> I0826 15:25:30.956478 960732 sched.cpp:703] Framework registere
>
> Anybody know how to disable them through spark-submit ?
>
> Cheers and many thanks
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/zookeeper-mesos-logging-in-spark-tp27607.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Reading parquet files into Spark Streaming

2016-08-26 Thread Renato Marroquín Mogrovejo
Hi all,

I am trying to use parquet files as input for DStream operations, but I
can't find any documentation or example. The only thing I found was [1] but
I also get the same error as in the post (Class
parquet.avro.AvroReadSupport not found).
Ideally I would like to do have something like this:

val oDStream = ssc.fileStream[Void, Order,
ParquetInputFormat[Order]]("data/")

where Order is a case class and the files inside "data" are all parquet
files.
Any hints would be highly appreciated. Thanks!


Best,

Renato M.

[1]
http://stackoverflow.com/questions/35413552/how-do-i-read-in-parquet-files-using-ssc-filestream-and-what-is-the-nature


Re: Insert non-null values from dataframe

2016-08-26 Thread Russell Spitzer
Cassandra does not differentiate between null and empty, so when reading
from C* all empty values are reported as null. To avoid inserting nulls
(avoiding tombstones) see

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#globally-treating-all-nulls-as-unset

This will not prevent those columns from being read as null though, it will
only skip writing tombstones.

On Thu, Aug 25, 2016, 1:23 PM Selvam Raman  wrote:

> Hi ,
>
> Dataframe:
> colA colB colC colD colE
> 1 2 3 4 5
> 1 2 3 null null
> 1 null null  null 5
> null null  3 4 5
>
> I want to insert dataframe to nosql database, where null occupies
> values(Cassandra). so i have to insert the column which has non-null values
> in the row.
>
> Expected:
>
> Record 1: (1,2,3,4,5)
> Record 2:(1,2,3)
> Record 3:(1,5)
> Record 4:(3,4,5)
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: How to make new composite columns by combining rows in the same group?

2016-08-26 Thread Xinh Huynh
That looks like a pivot table. Have you looked into using the pivot table 
method with DataFrames?

Xinh

> On Aug 26, 2016, at 4:54 AM, Rex X  wrote:
> 
> 1. Given following CSV file
> $cat data.csv
> 
> ID,City,Zip,Price,Rating
> 1,A,95123,100,0
> 1,B,95124,102,1
> 1,A,95126,100,1
> 2,B,95123,200,0
> 2,B,95124,201,1
> 2,C,95124,203,0
> 3,A,95126,300,1
> 3,C,95124,280,0
> 4,C,95124,400,1
> 
> We want to group by ID, and make new composite columns of Price and Rating 
> based on the value of $City-$Zip. 
> 
> 
> 2. The Expected Result:
> 
> IDA_95123_PriceA_95123_Rating A_95126_Price   A_95126_Rating  
> B_95123_Price   B_95123_Rating  B_95124_Price   B_95124_Rating  C_95124_Price 
>   C_95124_Rating
> 1 100 1   100 2   0   0   102 2   0   > 0
> 2 0   0   0   0   200 1   201 2   203 
> 1
> 3 0   0   300 2   0   0   0   0   280 
> 1
> 4 0   0   0   0   0   0   0   0   400 
> 2
> 
> Any tips would be greatly appreciated!
> 
> Thank you.
> 
> Regards,
> Rex
> 


zookeeper mesos logging in spark

2016-08-26 Thread aecc
Hi,

Everytime I run my spark application using mesos, I get logs in my console
in the form:

2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
I0826 15:25:30.949254 960752 sched.cpp:222] Version: 0.28.2
2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@log_env
2016-08-26 15:25:30,949:960521(0x7f6bccff9700):ZOO_INFO@zookeep
2016-08-26 15:25:30,951:960521(0x7f6bb4ff9700):ZOO_INFO@check_e
2016-08-26 15:25:30,952:960521(0x7f6bb4ff9700):ZOO_INFO@check_e
I0826 15:25:30.952505 960729 group.cpp:349] Group process (grou
I0826 15:25:30.952570 960729 group.cpp:831] Syncing group opera
I0826 15:25:30.952592 960729 group.cpp:427] Trying to create pa
I0826 15:25:30.954211 960722 detector.cpp:152] Detected a new l
I0826 15:25:30.954320 960744 group.cpp:700] Trying to get '/mes
I0826 15:25:30.955345 960724 detector.cpp:479] A new leading ma
I0826 15:25:30.955451 960724 sched.cpp:326] New master detected
I0826 15:25:30.955567 960724 sched.cpp:336] No credentials prov
I0826 15:25:30.956478 960732 sched.cpp:703] Framework registere

Anybody know how to disable them through spark-submit ?

Cheers and many thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/zookeeper-mesos-logging-in-spark-tp27607.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Best way to calculate intermediate column statistics

2016-08-26 Thread Mich Talebzadeh
Hi Bedrytski,

I assume you are referring to my code above.

The alternative SQL would be (the first code with rank)

SELECT *
FROM (
  SELECT transactiondate, transactiondescription, debitamount
  , RANK() OVER (ORDER BY transactiondate desc) AS rank
  FROM  WHERE transactiondescription LIKE '%HASHTAG%'
 ) tmp
WHERE rank = 1;
It all depends which one you are comfortable with. I can do either but I
guess coming from Data background it is easier for me to do the SQL part.
In addition with SQL you can run exactly the same code in Spark Thrift
Server or Hive.

You can run both codes in Zeppelin. You can run only SQL code in Tableau or
Squirrel :)

HTH



;




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 26 August 2016 at 15:18, Bedrytski Aliaksandr  wrote:

> Hi Mich,
>
> I was wondering what are the advantages of using helper methods instead of
> one SQL multiline string?
> (I rarely (if ever) use helper methods, but maybe I'm missing something)
>
> Regards
> --
>   Bedrytski Aliaksandr
>   sp...@bedryt.ski
>
>
>
> On Thu, Aug 25, 2016, at 11:39, Mich Talebzadeh wrote:
>
> Hi Richard,
>
> Windowing/Analytics for stats are pretty simple. Example
>
> import org.apache.spark.sql.expressions.Window
> val wSpec = Window.partitionBy('transactiontype).orderBy(desc(
> "transactiondate"))
> df.filter('transactiondescription.contains(HASHTAG)).select('
> transactiondate,'transactiondescription, *rank().over(wSpec).as("rank")).*
> filter($"rank"===1).show(1)
>
> val wSpec5 = Window.partitionBy('hashtag).orderBy(substring('
> transactiondate,1,4))
> val newDF = df.where('transactiontype === "DEB" &&
> ('transactiondescription).isNotNull).select(substring('
> transactiondate,1,4).as("Year"), 'hashtag.as("Retailer"),
> *round(sum('debitamount).over(wSpec5),2).as("Spent"*))
> newDF.distinct.orderBy('year,'Retailer).collect.foreach(println)
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On 25 August 2016 at 08:24, Richard Siebeling 
> wrote:
>
> Hi Mich,
>
> thanks for the suggestion, I hadn't thought of that. We'll need to gather
> the statistics in two ways, incremental when new data arrives and over the
> complete set when aggregating or filtering (because I think it's difficult
> to gather statistics while aggregating or filtering).
> The analytic functions could help when gathering the statistics over the
> whole set,
>
> kind regards,
> Richard
>
>
>
> On Wed, Aug 24, 2016 at 10:54 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
> Hi Richard,
>
> can you use analytics functions for this purpose on DF
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On 24 August 2016 at 21:37, Richard Siebeling 
> wrote:
>
> Hi Mich,
>
> I'd like to gather several statistics per column in order to make
> analysing data easier. These two statistics are some examples, other
> statistics I'd like to gather are the variance, the median, several
> percentiles, etc.  We are building a data analysis platform based on Spark,
>
> kind regards,
> Richard
>
> On Wed, Aug 24, 2016 at 6:52 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
> Hi Richard,
>
> What is the business use case for such statistics?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> 

Re: Best way to calculate intermediate column statistics

2016-08-26 Thread Bedrytski Aliaksandr
Hi Mich,

I was wondering what are the advantages of using helper methods instead
of one SQL multiline string?
(I rarely (if ever) use helper methods, but maybe I'm missing something)

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Thu, Aug 25, 2016, at 11:39, Mich Talebzadeh wrote:
> Hi Richard,
>
> Windowing/Analytics for stats are pretty simple. Example
>
> import org.apache.spark.sql.expressions.Window val wSpec =
> Window.partitionBy('transactiontype).orderBy(desc("transactiondate"))
> df.filter('transactiondescription.contains(HASHTAG)).select('transact-
> iondate,'transactiondescription,
> *rank().over(wSpec).as("rank")).*filter($"rank"===1).show(1)
>
> val wSpec5 =
> Window.partitionBy('hashtag).orderBy(substring('transactiondate,1,4))
> val newDF = df.where('transactiontype === "DEB" && ('transactiondescr-
> iption).isNotNull).select(substring('transactiondate,1,4).as("Year"),
> 'hashtag.as("Retailer"),*round(sum('debitamount).over(wSpec5),2).as("-
> Spent"*))
> newDF.distinct.orderBy('year,'Retailer).collect.foreach(println)
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which
> may arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary
> damages arising from such loss, damage or destruction.
>
>
>
>
> On 25 August 2016 at 08:24, Richard Siebeling
>  wrote:
>> Hi Mich,
>>
>> thanks for the suggestion, I hadn't thought of that. We'll need to
>> gather the statistics in two ways, incremental when new data arrives
>> and over the complete set when aggregating or filtering (because I
>> think it's difficult to gather statistics while aggregating or
>> filtering).
>> The analytic functions could help when gathering the statistics over
>> the whole set,
>>
>> kind regards,
>> Richard
>>
>>
>>
>> On Wed, Aug 24, 2016 at 10:54 PM, Mich Talebzadeh
>>  wrote:
>>> Hi Richard,
>>>
>>> can you use analytics functions for this purpose on DF
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn *
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for any loss, damage or destruction of data or any other property
>>> which may arise from relying on this email's technical content is
>>> explicitly disclaimed. The author will in no case be liable for any
>>> monetary damages arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On 24 August 2016 at 21:37, Richard Siebeling 
>>> wrote:
 Hi Mich,

 I'd like to gather several statistics per column in order to make
 analysing data easier. These two statistics are some examples,
 other statistics I'd like to gather are the variance, the median,
 several percentiles, etc.  We are building a data analysis platform
 based on Spark,

 kind regards,
 Richard

 On Wed, Aug 24, 2016 at 6:52 PM, Mich Talebzadeh
  wrote:
> Hi Richard,
>
> What is the business use case for such statistics?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility
> for any loss, damage or destruction of data or any other property
> which may arise from relying on this email's technical content is
> explicitly disclaimed. The author will in no case be liable for
> any monetary damages arising from such loss, damage or
> destruction.
>
>
>
>
> On 24 August 2016 at 16:01, Bedrytski Aliaksandr
>  wrote:
>> __
>> Hi Richard,
>>
>> these intermediate statistics should be calculated from the
>> result of the calculation or during the aggregation?
>> If they can be derived from the resulting dataframe, why not to
>> cache (persist) that result just after the calculation?
>> Then you may aggregate statistics from the cached dataframe.
>> This way it won't hit performance too much.
>>
>> Regards
>> --
>>   Bedrytski Aliaksandr
>>   sp...@bedryt.ski
>>
>>
>>
>> On Wed, Aug 24, 2016, at 16:42, Richard Siebeling wrote:
>>> Hi,
>>>
>>> what is the best way to calculate intermediate column statistics
>>> like the number of empty values and the number of distinct
>>> values each column in a 

Re: What do I loose if I run spark without using HDFS or Zookeeper?

2016-08-26 Thread kant kodali

@Mich ofcourse and In my previous message I have given a context as well.
Needless to say, the tools that are used by many banks that I came across such
as Citi, Capital One, Wells Fargo, GSachs are pretty laughable when it comes to
compliance and security. They somehow think they are secure when they aren't.





On Fri, Aug 26, 2016 5:46 AM, Mich Talebzadeh mich.talebza...@gmail.com wrote:
And yes any technology needs time for maturity but that said it shouldn't stop
us from transitioning
Depends on the application and how mission critical the business it is deployed
for. If you are using a tool for a Bank's Credit Risk (Surveillance, Anti-Money
Laundering, Employee Compliance, Anti-Fraud etc) and the tool missed a big chunk
for whatever reason then, the first thing will be the Bank will be fined in
($millions) and I will be looking for a new Job in London transport.
On the hand if the tools is used for some social media, sentiment analysis and
all that sort of stuff, I don't think anyone is going to lose sleep.
HTH








Dr Mich Talebzadeh



LinkedIn 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw




http://talebzadehmich.wordpress.com




Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any
other property which may arise from relying on this email's technical content is
explicitly disclaimed. The author will in no case be liable for any monetary
damages arising from such loss, damage or destruction.




On 26 August 2016 at 12:58, kant kodali < kanth...@gmail.com > wrote:
@Steve your arguments make sense however there is a good majority of people who
have extensive experience with zookeeper prefer to avoid zookeeper and given the
ease of consul (which btw uses raft for the election) and etcd lot of us are
more inclined to avoid ZK.
And yes any technology needs time for maturity but that said it shouldn't stop
us from transitioning. for example people started using spark when it first
released instead of waiting for spark 2.0 where there are lot of optimizations
and bug fixes.





On Fri, Aug 26, 2016 2:50 AM, Steve Loughran ste...@hortonworks.com wrote:

On 25 Aug 2016, at 22:49, kant kodali < kanth...@gmail.com > wrote:
yeah so its seems like its work in progress. At very least Mesos took the
initiative to provide alternatives to ZK. I am just really looking forward for
this.
https://issues.apache.org/ jira/browse/MESOS-3797






I worry about any attempt to implement distributed consensus systems: they take
time in production to get right.
1. There's the need to prove that what you are building is valid if the
implementation matches the specification. That has apparently been done for ZK,
though given the complexity of maths involved, I cannot vouch for that myself: 
https://blog.acolyer.org/2015/ 03/09/zab-high-performance-
broadcast-for-primary-backup- systems/
2. you need to run it in production to find the problems. Google's Chubby paper
hints about the things they found out went wrong there. As far as ZK goes,
jepsen hints its robust
https://aphyr.com/posts/291- jepsen-zookeeper
If it has weaknesses, I'd point at - it's security model -it's lack of 
helpfulness when there are kerberos/SASL auth problems (ZK server
closes connection; client sees connection failure and retries), -the fact that 
it's failure modes aren't always understood by people coding
against it.
http://blog.cloudera.com/blog/ 2014/03/zookeeper-resilience- at-pinterest/
the Raft algorithm appears to be easier to implement than Paxos; there are
things built on it and I look forward to seeing what works/doesn't work in
production.
Certainly Aphyr found problems when it pointed jepsen at etcd, though being a
2014 piece of work, I expect those specific problems to have been addressed. The
main thing is: it shows how hard it is to get things right in the presence of
complex failures.
Finally, regarding S3
You can use S3 object store as a source of data in queries/streaming, and, if
done carefully, a destination. Performance is variable...something some of us
are working on there, across S3a, spark and hive.
Conference placement: I shall be talking on that topic at Spark Summit Europe if
you want to find out more: https://spark-summit. org/eu-2016/

On Thu, Aug 25, 2016 2:00 PM, Michael Gummelt mgumm...@mesosphere.io wrote:
Mesos also uses ZK for leader election. There seems to be some effort in
supporting etcd, but it's in progress: https://issues. 
apache.org/jira/browse/MESOS- 1806

On Thu, Aug 25, 2016 at 1:55 PM, kant kodali < kanth...@gmail.com > wr ote:
@Ofir @Sean very good points.
@Mike We dont use Kafka or Hive and I understand that Zookeeper can do many
things but for our use case all we need is for high availability and given the
devops people frustrations here in our company who had extensive experience
managing large clusters in the past we would be very happy to avoid Zookeeper. I
also heard that Mesos 

Re: How to do this pairing in Spark?

2016-08-26 Thread ayan guha
Top of head

select *from
(Select ID, flag, lead(id) over(partition by city,zip order by flag,ID) c
from t)
Where id==0 and c is not null

Should do it. Basically you want to keep records which has ID 0 and have a
corresponding 1.

Please let me know if doesn't work, so I can provide a right solution.
On 26 Aug 2016 11:00, "Rex X"  wrote:

> 1. Given following CSV file
>
> > $cat data.csv
> >
> > ID,City,Zip,Flag
> > 1,A,95126,0
> > 2,A,95126,1
> > 3,A,95126,1
> > 4,B,95124,0
> > 5,B,95124,1
> > 6,C,95124,0
> > 7,C,95127,1
> > 8,C,95127,0
> > 9,C,95127,1
>
>
> (a) where "ID" above is a primary key (unique),
>
> (b) for each "City" and "Zip" combination, there is one ID in max with
> Flag=0; while it can contain multiple IDs with Flag=1 for each "City" and
> "Zip" combination.
>
> (c) Flag can be 0 or 1
>
>
> 2. For each ID with Flag=0, we want to pair it with another ID with
> Flag=1 but with the same City - Zip. If one cannot find another paired ID
> with Flag=1 and matched City - Zip, we just delete that record.
>
> Here is the expected result:
>
> > ID,City,Zip,Flag
> > 1,A,95126,0
> > 2,A,95126,1
> > 4,B,95124,0
> > 5,B,95124,1
> > 7,C,95127,1
> > 8,C,95127,0
>
>
> Any valuable tips how to do this pairing in Python or Scala?
>
> Great thanks!
>
> Rex
>


Re: How to install spark with s3 on AWS?

2016-08-26 Thread kant kodali

Hmm do I always need to have that in my driver program? Why can't I set it
somewhere such that spark cluster realizes that is needs to use s3?





On Fri, Aug 26, 2016 5:13 AM, Devi P.V devip2...@gmail.com wrote:
The following piece of code works for me to read data from S3 using Spark.

val conf = new SparkConf().setAppName("Simple Application").setMaster("local 
[*]")
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfigurat ion;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native 
.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAcces sKeyId",AccessKey)
hadoopConf.set("fs.s3.awsSecre tAccessKey",SecretKey)
var jobInput = sc.textFile("s3://path to bucket")

Thanks


On Fri, Aug 26, 2016 at 5:16 PM, kant kodali < kanth...@gmail.com > wrote:
Hi guys,
Are there any instructions on how to setup spark with S3 on AWS?
Thanks!

Re: What do I loose if I run spark without using HDFS or Zookeeper?

2016-08-26 Thread Carlile, Ken



We use Spark with NFS as the data store, mainly using Dr. Jeremy Freeman’s Thunder framework. Works very well (and I see HUGE throughput on the storage system during loads). I haven’t seen (or heard from the devs/users) a need for HDFS or S3.


—Ken



On Aug 25, 2016, at 8:02 PM, kant kodali  wrote:






ZFS linux port has got very stable these days given LLNL maintains the linux port and they also use it as a FileSystem for their super computer (The supercomputer is one of the top in the nation is what I heard)











On Thu, Aug 25, 2016 4:58 PM, kant kodali kanth...@gmail.com wrote:






How about using ZFS?











On Thu, Aug 25, 2016 3:48 PM, Mark Hamstra m...@clearstorydata.com wrote:


That's often not as important as you might think.  It really only affects the loading of data by the first Stage.  Subsequent Stages (in the same Job or even in other Jobs if you do it right) will use the map outputs, and will do so
 with good data locality.

On Thu, Aug 25, 2016 at 3:36 PM, ayan guha  wrote:

At the core of it map reduce relies heavily on data locality. You would lose the ability to process data closest to where it resides if you do not use hdfs. 
S3 or NFS will not able to provide that.


On 26 Aug 2016 07:49, "kant kodali"  wrote:







yeah so its seems like its work in progress. At very least Mesos took the initiative to provide alternatives to ZK. I am just really looking forward for this. 


https://issues.apache.org/jira/browse/MESOS-3797











On Thu, Aug 25, 2016 2:00 PM, Michael Gummelt mgumm...@mesosphere.io wrote:


Mesos also uses ZK for leader election.  There seems to be some effort in supporting etcd, but it's in progress: https://issues.apache.org/jira/browse/MESOS-1806


On Thu, Aug 25, 2016 at 1:55 PM, kant kodali  wrote:







@Ofir @Sean very good points.


@Mike We dont use Kafka or Hive and I understand that Zookeeper can do many things but for our use case all we need is for high availability and given the devops people frustrations here in our company who had extensive experience managing large
 clusters in the past we would be very happy to avoid Zookeeper. I also heard that Mesos can provide High Availability through etcd and consul and if that is true I will be left with the following stack


Spark + Mesos scheduler + Distributed File System or to be precise I should say Distributed Storage since S3 is an object store so I guess this will be HDFS for us + etcd & consul. Now the big question for me is how do I set all this up 













On Thu, Aug 25, 2016 1:35 PM, Ofir Manor ofir.ma...@equalum.io wrote:


Just to add one concrete example regarding HDFS dependency.
Have a look at checkpointing https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#checkpointing
For example, for Spark Streaming, you can not do any window operation in a cluster without checkpointing to HDFS (or S3).






Ofir Manor

Co-Founder & CTO | Equalum


Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io





On Thu, Aug 25, 2016 at 11:13 PM, Mich Talebzadeh  wrote:


Hi Kant,


I trust the following would be of use.


Big Data depends on Hadoop Ecosystem from whichever angle one looks at it.


In the heart of it and with reference to points you raised about HDFS, one needs to have a working knowledge of Hadoop Core System including HDFS, Map-reduce algorithm and Yarn whether one uses them or not. After all Big Data is all about horizontal
 scaling with master and nodes (as opposed to vertical scaling like SQL Server running on a Host). and distributed data (by default data is replicated three times on different nodes for scalability and availability). 


Other members including Sean provided the limits on how far one operate Spark in its own space. If you are going to deal with data (data in motion and data at rest), then you will need to interact with some form of storage and HDFS and compatible
 file systems like S3 are the natural choices.


Zookeeper is not just about high availability. It is used in Spark Streaming with Kafka, it is also used with Hive for concurrency. It is also a distributed locking system.


HTH











Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com



Disclaimer: Use
 it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content
 is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. 

 













On 25 August 2016 at 

Re: What do I loose if I run spark without using HDFS or Zookeeper?

2016-08-26 Thread Mich Talebzadeh
And yes any technology needs time for maturity but that said it shouldn't
stop us from transitioning

Depends on the application and how mission critical the business it is
deployed for. If you are using a tool for a Bank's Credit Risk
(Surveillance, Anti-Money Laundering, Employee Compliance, Anti-Fraud etc)
and the tool missed a big chunk for whatever reason then, the first thing
will be the Bank will be fined in ($millions)  and I will be looking for a
new Job in London transport.

On the hand if the tools is used for some social media, sentiment analysis
and all that sort of stuff, I don't think anyone is going to lose sleep.

HTH









Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 26 August 2016 at 12:58, kant kodali  wrote:

> @Steve your arguments make sense however there is a good majority of
> people who have extensive experience with zookeeper prefer to avoid
> zookeeper and given the ease of consul (which btw uses raft for the
> election) and etcd lot of us are more inclined to avoid ZK.
>
> And yes any technology needs time for maturity but that said it shouldn't
> stop us from transitioning. for example people started using spark when it
> first released instead of waiting for spark 2.0 where there are lot of
> optimizations and bug fixes.
>
>
>
> On Fri, Aug 26, 2016 2:50 AM, Steve Loughran ste...@hortonworks.com wrote:
>
>>
>> On 25 Aug 2016, at 22:49, kant kodali  wrote:
>>
>> yeah so its seems like its work in progress. At very least Mesos took the
>> initiative to provide alternatives to ZK. I am just really looking forward
>> for this.
>>
>> https://issues.apache.org/jira/browse/MESOS-3797
>>
>>
>>
>>
>> I worry about any attempt to implement distributed consensus systems:
>> they take time in production to get right.
>>
>> 1. There's the need to prove that what you are building is valid if the
>> implementation matches the specification. That has apparently been done for
>> ZK, though given the complexity of maths involved, I cannot vouch for that
>> myself:
>> https://blog.acolyer.org/2015/03/09/zab-high-performance-
>> broadcast-for-primary-backup-systems/
>>
>> 2. you need to run it in production to find the problems. Google's Chubby
>> paper hints about the things they found out went wrong there. As far as ZK
>> goes, jepsen hints its robust
>>
>> https://aphyr.com/posts/291-jepsen-zookeeper
>>
>> If it has weaknesses, I'd point at
>>  - it's security model
>>  -it's lack of helpfulness when there are kerberos/SASL auth problems (ZK
>> server closes connection; client sees connection failure and retries),
>>  -the fact that it's failure modes aren't always understood by people
>> coding against it.
>>
>> http://blog.cloudera.com/blog/2014/03/zookeeper-resilience-at-pinterest/
>>
>> the Raft algorithm appears to be easier to implement than Paxos; there
>> are things built on it and I look forward to seeing what works/doesn't work
>> in production.
>>
>> Certainly Aphyr found problems when it pointed jepsen at etcd, though
>> being a 2014 piece of work, I expect those specific problems to have been
>> addressed. The main thing is: it shows how hard it is to get things right
>> in the presence of complex failures.
>>
>> Finally, regarding S3
>>
>> You can use S3 object store as a source of data in queries/streaming,
>> and, if done carefully, a destination. Performance is variable...something
>> some of us are working on there, across S3a, spark and hive.
>>
>> Conference placement: I shall be talking on that topic at Spark Summit
>> Europe if you want to find out more: https://spark-summit.org/eu-2016/
>>
>>
>> On Thu, Aug 25, 2016 2:00 PM, Michael Gummelt mgumm...@mesosphere.io
>> wrote:
>>
>> Mesos also uses ZK for leader election.  There seems to be some effort in
>> supporting etcd, but it's in progress: https://issues.
>> apache.org/jira/browse/MESOS-1806
>>
>> On Thu, Aug 25, 2016 at 1:55 PM, kant kodali  wrote:
>>
>> @Ofir @Sean very good points.
>>
>> @Mike We dont use Kafka or Hive and I understand that Zookeeper can do
>> many things but for our use case all we need is for high availability and
>> given the devops people frustrations here in our company who had extensive
>> experience managing large clusters in the past we would be very happy to
>> avoid Zookeeper. I also heard that Mesos can provide High Availability
>> through etcd and consul and if that is true I will be left 

unable to start slaves from master (SSH problem)

2016-08-26 Thread kant kodali
Hi,

I am unable to start spark slaves from my master node. when I run
./start-all.sh in my master node it brings up the master and but fails for
slaves saying "permission denied public key" for slaves but I did add the
master id_rsa.pub to my slaves authorized_keys and I checked manually from
my master node that I was able to ssh into my slave node without any
password. anything else I am missing?

Thanks!


Re: How to install spark with s3 on AWS?

2016-08-26 Thread Devi P.V
The following piece of code works for me to read data from S3 using Spark.

val conf = new SparkConf().setAppName("Simple
Application").setMaster("local[*]")

val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native
.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",AccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",SecretKey)
var jobInput = sc.textFile("s3://path to bucket")

Thanks


On Fri, Aug 26, 2016 at 5:16 PM, kant kodali  wrote:

> Hi guys,
>
> Are there any instructions on how to setup spark with S3 on AWS?
>
> Thanks!
>
>


Re: What do I loose if I run spark without using HDFS or Zookeeper?

2016-08-26 Thread kant kodali

@Steve your arguments make sense however there is a good majority of people who
have extensive experience with zookeeper prefer to avoid zookeeper and given the
ease of consul (which btw uses raft for the election) and etcd lot of us are
more inclined to avoid ZK.
And yes any technology needs time for maturity but that said it shouldn't stop
us from transitioning. for example people started using spark when it first
released instead of waiting for spark 2.0 where there are lot of optimizations
and bug fixes.





On Fri, Aug 26, 2016 2:50 AM, Steve Loughran ste...@hortonworks.com wrote:

On 25 Aug 2016, at 22:49, kant kodali < kanth...@gmail.com > wrote:
yeah so its seems like its work in progress. At very least Mesos took the
initiative to provide alternatives to ZK. I am just really looking forward for
this.
https://issues.apache.org/jira/browse/MESOS-3797






I worry about any attempt to implement distributed consensus systems: they take
time in production to get right.
1. There's the need to prove that what you are building is valid if the
implementation matches the specification. That has apparently been done for ZK,
though given the complexity of maths involved, I cannot vouch for that myself: 
https://blog.acolyer.org/2015/03/09/zab-high-performance-broadcast-for-primary-backup-systems/

2. you need to run it in production to find the problems. Google's Chubby paper
hints about the things they found out went wrong there. As far as ZK goes,
jepsen hints its robust
https://aphyr.com/posts/291-jepsen-zookeeper
If it has weaknesses, I'd point at - it's security model -it's lack of 
helpfulness when there are kerberos/SASL auth problems (ZK server
closes connection; client sees connection failure and retries), -the fact that 
it's failure modes aren't always understood by people coding
against it.
http://blog.cloudera.com/blog/2014/03/zookeeper-resilience-at-pinterest/
the Raft algorithm appears to be easier to implement than Paxos; there are
things built on it and I look forward to seeing what works/doesn't work in
production.
Certainly Aphyr found problems when it pointed jepsen at etcd, though being a
2014 piece of work, I expect those specific problems to have been addressed. The
main thing is: it shows how hard it is to get things right in the presence of
complex failures.
Finally, regarding S3
You can use S3 object store as a source of data in queries/streaming, and, if
done carefully, a destination. Performance is variable...something some of us
are working on there, across S3a, spark and hive.
Conference placement: I shall be talking on that topic at Spark Summit Europe if
you want to find out more: https://spark-summit.org/eu-2016/

On Thu, Aug 25, 2016 2:00 PM, Michael Gummelt mgumm...@mesosphere.io wrote:
Mesos also uses ZK for leader election. There seems to be some effort in
supporting etcd, but it's in progress: 
https://issues.apache.org/jira/browse/MESOS-1806

On Thu, Aug 25, 2016 at 1:55 PM, kant kodali < kanth...@gmail.com > wrote:
@Ofir @Sean very good points.
@Mike We dont use Kafka or Hive and I understand that Zookeeper can do many
things but for our use case all we need is for high availability and given the
devops people frustrations here in our company who had extensive experience
managing large clusters in the past we would be very happy to avoid Zookeeper. I
also heard that Mesos can provide High Availability through etcd and consul and
if that is true I will be left with the following stack




Spark + Mesos scheduler + Distributed File System or to be precise I should say
Distributed Storage since S3 is an object store so I guess this will be HDFS for
us + etcd & consul. Now the big question for me is how do I set all this up

Re: How to make new composite columns by combining rows in the same group?

2016-08-26 Thread Rex X
The data.csv need to be corrected:


1. Given following CSV file
$cat data.csv

ID,City,Zip,Price,Rating
1,A,95123,100,1
1,B,95124,102,2
1,A,95126,100,2
2,B,95123,200,1
2,B,95124,201,2
2,C,95124,203,1
3,A,95126,300,2
3,C,95124,280,1
4,C,95124,400,2


On Fri, Aug 26, 2016 at 4:54 AM, Rex X  wrote:

> 1. Given following CSV file
>
> $cat data.csv
>
> ID,City,Zip,Price,Rating1,A,95123,100,01,B,95124,102,11,A,95126,100,12,B,95123,200,02,B,95124,201,12,C,95124,203,03,A,95126,300,13,C,95124,280,04,C,95124,400,1
>
>
> We want to group by ID, and make new composite columns of Price and Rating
> based on the value of $City-$Zip.
>
>
> 2. The Expected Result:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>   ID
>   A_95123_Price
>A_95123_Rating
>   A_95126_Price
>   A_95126_Rating
>   B_95123_Price
>   B_95123_Rating
>   B_95124_Price
>   B_95124_Rating
>   C_95124_Price
>   C_95124_Rating
>
>
>   1
>   100
>   1
>   100
>   2
>   0
>   0
>   102
>   2
>   0
>   0
>
>
>   2
>   0
>   0
>   0
>   0
>   200
>   1
>   201
>   2
>   203
>   1
>
>
>   3
>   0
>   0
>   300
>   2
>   0
>   0
>   0
>   0
>   280
>   1
>
>
>   4
>   0
>   0
>   0
>   0
>   0
>   0
>   0
>   0
>   400
>   2
>
> Any tips would be greatly appreciated!
>
> Thank you.
>
> Regards,
> Rex
>
>


How to make new composite columns by combining rows in the same group?

2016-08-26 Thread Rex X
1. Given following CSV file

$cat data.csv

ID,City,Zip,Price,Rating1,A,95123,100,01,B,95124,102,11,A,95126,100,12,B,95123,200,02,B,95124,201,12,C,95124,203,03,A,95126,300,13,C,95124,280,04,C,95124,400,1


We want to group by ID, and make new composite columns of Price and Rating
based on the value of $City-$Zip.


2. The Expected Result:















  ID
  A_95123_Price
   A_95123_Rating
  A_95126_Price
  A_95126_Rating
  B_95123_Price
  B_95123_Rating
  B_95124_Price
  B_95124_Rating
  C_95124_Price
  C_95124_Rating


  1
  100
  1
  100
  2
  0
  0
  102
  2
  0
  0


  2
  0
  0
  0
  0
  200
  1
  201
  2
  203
  1


  3
  0
  0
  300
  2
  0
  0
  0
  0
  280
  1


  4
  0
  0
  0
  0
  0
  0
  0
  0
  400
  2

Any tips would be greatly appreciated!

Thank you.

Regards,
Rex


How to install spark with s3 on AWS?

2016-08-26 Thread kant kodali
Hi guys,

Are there any instructions on how to setup spark with S3 on AWS?

Thanks!


Re: How to do this pairing in Spark?

2016-08-26 Thread Rex X
Hi Ayan,

Yes, ID=3 can be paired with ID=1, and the same for ID=9 with ID=8. BUT we
want to keep only ONE pair for the ID with Flag=0.

Since ID=1 with Flag=0 already paired with ID=2, and ID=8 paired with ID=7,
we simply delete ID=3 and ID=9.

Thanks!

Regards,
Rex


On Fri, Aug 26, 2016 at 12:46 AM, ayan guha  wrote:

> Why 3 and 9 should be deleted? 3 can be paired with 1and 9 can be paired
> with 8.
> On 26 Aug 2016 11:00, "Rex X"  wrote:
>
>> 1. Given following CSV file
>>
>> > $cat data.csv
>> >
>> > ID,City,Zip,Flag
>> > 1,A,95126,0
>> > 2,A,95126,1
>> > 3,A,95126,1
>> > 4,B,95124,0
>> > 5,B,95124,1
>> > 6,C,95124,0
>> > 7,C,95127,1
>> > 8,C,95127,0
>> > 9,C,95127,1
>>
>>
>> (a) where "ID" above is a primary key (unique),
>>
>> (b) for each "City" and "Zip" combination, there is one ID in max with
>> Flag=0; while it can contain multiple IDs with Flag=1 for each "City" and
>> "Zip" combination.
>>
>> (c) Flag can be 0 or 1
>>
>>
>> 2. For each ID with Flag=0, we want to pair it with another ID with
>> Flag=1 but with the same City - Zip. If one cannot find another paired ID
>> with Flag=1 and matched City - Zip, we just delete that record.
>>
>> Here is the expected result:
>>
>> > ID,City,Zip,Flag
>> > 1,A,95126,0
>> > 2,A,95126,1
>> > 4,B,95124,0
>> > 5,B,95124,1
>> > 7,C,95127,1
>> > 8,C,95127,0
>>
>>
>> Any valuable tips how to do this pairing in Python or Scala?
>>
>> Great thanks!
>>
>> Rex
>>
>


Re: What do I loose if I run spark without using HDFS or Zookeeper?

2016-08-26 Thread Steve Loughran

On 25 Aug 2016, at 22:49, kant kodali 
> wrote:

yeah so its seems like its work in progress. At very least Mesos took the 
initiative to provide alternatives to ZK. I am just really looking forward for 
this.

https://issues.apache.org/jira/browse/MESOS-3797




I worry about any attempt to implement distributed consensus systems: they take 
time in production to get right.

1. There's the need to prove that what you are building is valid if the 
implementation matches the specification. That has apparently been done for ZK, 
though given the complexity of maths involved, I cannot vouch for that myself:
https://blog.acolyer.org/2015/03/09/zab-high-performance-broadcast-for-primary-backup-systems/

2. you need to run it in production to find the problems. Google's Chubby paper 
hints about the things they found out went wrong there. As far as ZK goes, 
jepsen hints its robust

https://aphyr.com/posts/291-jepsen-zookeeper

If it has weaknesses, I'd point at
 - it's security model
 -it's lack of helpfulness when there are kerberos/SASL auth problems (ZK 
server closes connection; client sees connection failure and retries),
 -the fact that it's failure modes aren't always understood by people coding 
against it.

http://blog.cloudera.com/blog/2014/03/zookeeper-resilience-at-pinterest/

the Raft algorithm appears to be easier to implement than Paxos; there are 
things built on it and I look forward to seeing what works/doesn't work in 
production.

Certainly Aphyr found problems when it pointed jepsen at etcd, though being a 
2014 piece of work, I expect those specific problems to have been addressed. 
The main thing is: it shows how hard it is to get things right in the presence 
of complex failures.

Finally, regarding S3

You can use S3 object store as a source of data in queries/streaming, and, if 
done carefully, a destination. Performance is variable...something some of us 
are working on there, across S3a, spark and hive.

Conference placement: I shall be talking on that topic at Spark Summit Europe 
if you want to find out more: https://spark-summit.org/eu-2016/


On Thu, Aug 25, 2016 2:00 PM, Michael Gummelt 
mgumm...@mesosphere.io wrote:
Mesos also uses ZK for leader election.  There seems to be some effort in 
supporting etcd, but it's in progress: 
https://issues.apache.org/jira/browse/MESOS-1806

On Thu, Aug 25, 2016 at 1:55 PM, kant kodali 
> wrote:
@Ofir @Sean very good points.

@Mike We dont use Kafka or Hive and I understand that Zookeeper can do many 
things but for our use case all we need is for high availability and given the 
devops people frustrations here in our company who had extensive experience 
managing large clusters in the past we would be very happy to avoid Zookeeper. 
I also heard that Mesos can provide High Availability through etcd and consul 
and if that is true I will be left with the following stack





Spark + Mesos scheduler + Distributed File System or to be precise I should say 
Distributed Storage since S3 is an object store so I guess this will be HDFS 
for us + etcd & consul. Now the big question for me is how do I set all this up 
[https://dv4jgpe7xb4ws.cloudfront.net/v1/simple_smile.png]








Re: How to do this pairing in Spark?

2016-08-26 Thread ayan guha
Why 3 and 9 should be deleted? 3 can be paired with 1and 9 can be paired
with 8.
On 26 Aug 2016 11:00, "Rex X"  wrote:

> 1. Given following CSV file
>
> > $cat data.csv
> >
> > ID,City,Zip,Flag
> > 1,A,95126,0
> > 2,A,95126,1
> > 3,A,95126,1
> > 4,B,95124,0
> > 5,B,95124,1
> > 6,C,95124,0
> > 7,C,95127,1
> > 8,C,95127,0
> > 9,C,95127,1
>
>
> (a) where "ID" above is a primary key (unique),
>
> (b) for each "City" and "Zip" combination, there is one ID in max with
> Flag=0; while it can contain multiple IDs with Flag=1 for each "City" and
> "Zip" combination.
>
> (c) Flag can be 0 or 1
>
>
> 2. For each ID with Flag=0, we want to pair it with another ID with
> Flag=1 but with the same City - Zip. If one cannot find another paired ID
> with Flag=1 and matched City - Zip, we just delete that record.
>
> Here is the expected result:
>
> > ID,City,Zip,Flag
> > 1,A,95126,0
> > 2,A,95126,1
> > 4,B,95124,0
> > 5,B,95124,1
> > 7,C,95127,1
> > 8,C,95127,0
>
>
> Any valuable tips how to do this pairing in Python or Scala?
>
> Great thanks!
>
> Rex
>


Re: mutable.LinkedHashMap kryo serialization issues

2016-08-26 Thread Renato Marroquín Mogrovejo
Hi Rahul,

You have probably already figured this one out, but anyway...
You need to register the classes that you'll be using with Kryo because it
does not support all Serializable types and requires you to register the
classes you’ll use in the program in advance. So when you don't register
the class, Kryo doesn't know how to serialize/deserialize it.


Best,

Renato M.

2016-08-22 17:12 GMT+02:00 Rahul Palamuttam :

> Hi,
>
> Just sending this again to see if others have had this issue.
>
> I recently switched to using kryo serialization and I've been running into
> errors
> with the mutable.LinkedHashMap class.
>
> If I don't register the mutable.LinkedHashMap class then I get an
> ArrayStoreException seen below.
> If I do register the class, then when the LinkedHashMap is collected on
> the driver, it does not contain any elements.
>
> Here is the snippet of code I used :
>
> val sc = new SparkContext(new SparkConf()
>   .setMaster("local[*]")
>   .setAppName("Sample")
>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>   .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]])))
>
> val collect = sc.parallelize(0 to 10)
>   .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
> "bonjour"), ("good", "bueno")))
>
> val mapSideSizes = collect.map(p => p.size).collect()(0)
> val driverSideSizes = collect.collect()(0).size
>
> println("The sizes before collect : " + mapSideSizes)
> println("The sizes after collect : " + driverSideSizes)
>
>
> ** The following only occurs if I did not register the
> mutable.LinkedHashMap class **
> 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task
> result
> java.lang.ArrayStoreException: scala.collection.mutable.HashMap
> at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.read(DefaultArraySerializers.java:338)
> at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.read(DefaultArraySerializers.java:293)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> at org.apache.spark.serializer.KryoSerializerInstance.
> deserialize(KryoSerializer.scala:311)
> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$
> anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$
> anonfun$run$1.apply(TaskResultGetter.scala:51)
> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$
> anonfun$run$1.apply(TaskResultGetter.scala:51)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
> at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(
> TaskResultGetter.scala:50)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I hope this is a known issue and/or I'm missing something important in my
> setup.
> Appreciate any help or advice!
>
> Best,
>
> Rahul Palamuttam
>