Re: PySpark Pandas UDF

2019-11-17 Thread Bryan Cutler
There was a change in the binary format of Arrow 0.15.1 and there is an
environment variable you can set to make pyarrow 0.15.1 compatible with
current Spark, which looks to be your problem. Please see the doc below for
instructions added in SPARK-2936. Note, this will not be required for the
upcoming release of Spark 3.0.0.
https://github.com/apache/spark/blob/master/docs/sql-pyspark-pandas-with-arrow.md#compatibiliy-setting-for-pyarrow--0150-and-spark-23x-24x

On Tue, Nov 12, 2019 at 7:53 AM Holden Karau  wrote:

> Thanks for sharing that. I think we should maybe add some checks around
> this so it’s easier to debug. I’m CCing Bryan who might have some thoughts.
>
> On Tue, Nov 12, 2019 at 7:42 AM gal.benshlomo 
> wrote:
>
>> SOLVED!
>> thanks for the help - I found the issue. it was the version of pyarrow
>> (0.15.1) which apparently isn't currently stable. Downgrading it solved
>> the
>> issue for me
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: [DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-12 Thread Bryan Cutler
Thanks all. I created a WIP PR at https://github.com/apache/spark/pull/26496,
we can further discuss the details in there.

On Thu, Nov 7, 2019 at 7:01 PM Takuya UESHIN  wrote:

> +1
>
> On Thu, Nov 7, 2019 at 6:54 PM Shane Knapp  wrote:
>
>> +1
>>
>> On Thu, Nov 7, 2019 at 6:08 PM Hyukjin Kwon  wrote:
>> >
>> > +1
>> >
>> > 2019년 11월 6일 (수) 오후 11:38, Wenchen Fan 님이 작성:
>> >>
>> >> Sounds reasonable to me. We should make the behavior consistent within
>> Spark.
>> >>
>> >> On Tue, Nov 5, 2019 at 6:29 AM Bryan Cutler  wrote:
>> >>>
>> >>> Currently, when a PySpark Row is created with keyword arguments, the
>> fields are sorted alphabetically. This has created a lot of confusion with
>> users because it is not obvious (although it is stated in the pydocs) that
>> they will be sorted alphabetically. Then later when applying a schema and
>> the field order does not match, an error will occur. Here is a list of some
>> of the JIRAs that I have been tracking all related to this issue:
>> SPARK-24915, SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion
>> of the issue [1].
>> >>>
>> >>> The original reason for sorting fields is because kwargs in python <
>> 3.6 are not guaranteed to be in the same order that they were entered [2].
>> Sorting alphabetically ensures a consistent order. Matters are further
>> complicated with the flag _from_dict_ that allows the Row fields to to be
>> referenced by name when made by kwargs, but this flag is not serialized
>> with the Row and leads to inconsistent behavior. For instance:
>> >>>
>> >>> >>> spark.createDataFrame([Row(A="1", B="2")], "B string, A
>> string").first()
>> >>> Row(B='2', A='1')
>> >>> >>> spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1",
>> B="2")]), "B string, A string").first()
>> >>> Row(B='1', A='2')
>> >>>
>> >>> I think the best way to fix this is to remove the sorting of fields
>> when constructing a Row. For users with Python 3.6+, nothing would change
>> because these versions of Python ensure that the kwargs stays in the
>> ordered entered. For users with Python < 3.6, using kwargs would check a
>> conf to either raise an error or fallback to a LegacyRow that sorts the
>> fields as before. With Python < 3.6 being deprecated now, this LegacyRow
>> can also be removed at the same time. There are also other ways to create
>> Rows that will not be affected. I have opened a JIRA [3] to capture this,
>> but I am wondering what others think about fixing this for Spark 3.0?
>> >>>
>> >>> [1] https://github.com/apache/spark/pull/20280
>> >>> [2] https://www.python.org/dev/peps/pep-0468/
>> >>> [3] https://issues.apache.org/jira/browse/SPARK-29748
>>
>>
>>
>> --
>> Shane Knapp
>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>> https://rise.cs.berkeley.edu
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Takuya UESHIN
> Tokyo, Japan
>
> http://twitter.com/ueshin
>


[DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-04 Thread Bryan Cutler
Currently, when a PySpark Row is created with keyword arguments, the fields
are sorted alphabetically. This has created a lot of confusion with users
because it is not obvious (although it is stated in the pydocs) that they
will be sorted alphabetically. Then later when applying a schema and the
field order does not match, an error will occur. Here is a list of some of
the JIRAs that I have been tracking all related to this issue: SPARK-24915,
SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion of the issue
[1].

The original reason for sorting fields is because kwargs in python < 3.6
are not guaranteed to be in the same order that they were entered [2].
Sorting alphabetically ensures a consistent order. Matters are further
complicated with the flag _*from_dict*_ that allows the Row fields to to be
referenced by name when made by kwargs, but this flag is not serialized
with the Row and leads to inconsistent behavior. For instance:

>>> spark.createDataFrame([Row(A="1", B="2")], "B string, A string").first()
Row(B='2', A='1')>>>
spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1",
B="2")]), "B string, A string").first()
Row(B='1', A='2')

I think the best way to fix this is to remove the sorting of fields when
constructing a Row. For users with Python 3.6+, nothing would change
because these versions of Python ensure that the kwargs stays in the
ordered entered. For users with Python < 3.6, using kwargs would check a
conf to either raise an error or fallback to a LegacyRow that sorts the
fields as before. With Python < 3.6 being deprecated now, this LegacyRow
can also be removed at the same time. There are also other ways to create
Rows that will not be affected. I have opened a JIRA [3] to capture this,
but I am wondering what others think about fixing this for Spark 3.0?

[1] https://github.com/apache/spark/pull/20280
[2] https://www.python.org/dev/peps/pep-0468/
[3] https://issues.apache.org/jira/browse/SPARK-29748


Re: question about pyarrow.Table to pyspark.DataFrame conversion

2019-09-10 Thread Bryan Cutler
Hi Artem,

I don't believe this is currently possible, but it could be a great
addition to PySpark since this would offer a convenient and efficient way
to parallelize nested column data. I created the JIRA
https://issues.apache.org/jira/browse/SPARK-29040 for this.

On Tue, Aug 27, 2019 at 7:55 PM Artem Kozhevnikov <
kozhevnikov.ar...@gmail.com> wrote:

> I wonder if there's some recommended method to convert in memory
> pyarrow.Table (or pyarrow.BatchRecord) to pyspark.Dataframe without using
> pandas ?
> My motivation is about converting nested data (like List[int]) that have
> an efficient representation in pyarrow which is not possible with Pandas (I
> don't want to pass by python list of int ...).
>
> Thanks in advance !
> Artem
>
>
>


Re: Usage of PyArrow in Spark

2019-07-18 Thread Bryan Cutler
It would be possible to use arrow on regular python udfs and avoid pandas,
and there would probably be some performance improvement. The difficult
part will be to ensure that the data remains consistent in the conversions
between Arrow and Python, e.g. timestamps are a bit tricky.  Given that we
already have pandas_udfs, I'm not sure if it would be worth the effort but
it might be a good experiment to see how much improvement it would bring.

Bryan

On Thu, Jul 18, 2019 at 12:02 AM Abdeali Kothari 
wrote:

> I was thinking of implementing that. But quickly realized that doing a
> conversion of Spark -> Pandas -> Python causes errors.
>
> A quick example being "None" in Numeric data types.
> Pandas supports only NaN. Spark supports NULL and NaN.
>
> This is just one of the issues I came to.
> I'm not sure about some of the more complex types like Array, Map, struct
> which are internally converted to pd.Series with type being object.
>
> I think that avoiding pandas in between and doing something from Arrow to
> Python would be more efficient as, if I understand right, Arrow has a wider
> range of types and can handle this better.
>
> >>> from pyspark.sql import functions as F
> >>> sdf = spark.createDataFrame([ [None], [float('nan')], [1.1] ], ['val'])
>
> # Return the column with no change
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col)
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> ++-+
> | val|(val)|
> ++-+
> |null| null|
> | NaN| null|
> | 1.1|  1.1|
> ++-+
>
> # isnull()
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
> col.isnull())
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> ++-+
> | val|(val)|
> ++-+
> |null|  1.0|
> | NaN|  1.0|
> | 1.1|  0.0|
> ++-+
>
> # Check for "is None"
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
> col.apply(lambda x: x is None))
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> ++-+
> | val|(val)|
> ++-+
> |null|  0.0|
> | NaN|  0.0|
> | 1.1|  0.0|
> ++-+
>
> On Wed, Jul 17, 2019 at 4:47 PM Hyukjin Kwon  wrote:
>
>> Regular Python UDFs don't use PyArrow under the hood.
>> Yes, they can potentially benefit but they can be easily worked around
>> via Pandas UDFs.
>>
>> For instance, both below are virtually identical.
>>
>> @udf(...)
>> def func(col):
>> return col
>>
>> @pandas_udf(...)
>> def pandas_func(col):
>> return a.apply(lambda col: col)
>>
>> If we only need some minimised change, I would be positive about adding
>> Arrow support into regular Python UDFs. Otherwise, I am not sure yet.
>>
>>
>> 2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari 님이
>> 작성:
>>
>>> Hi,
>>> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
>>> spark. And I was trying to understand the benefit in terms of serialization
>>> / deserializaiton it provides.
>>>
>>> I understand that the new pandas-udf works only if pyarrow is installed.
>>> But what about the plain old PythonUDF which can be used in map() kind
>>> of operations?
>>> Are they also using pyarrow under the hood to reduce the cost is serde?
>>> Or do they remain as earlier and no performance gain should be expected in
>>> those?
>>>
>>> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow
>>> as the data transfer cost to serialize/deserialzie from Java to Python and
>>> back still exists and could potentially be reduced by using Arrow?
>>> Is my understanding correct? Are there any plans to implement this?
>>>
>>> Pointers to any notes or Jira about this would be appreciated.
>>>
>>


Re: Should python-2 be supported in Spark 3.0?

2019-05-31 Thread Bryan Cutler
+1 and the draft sounds good

On Thu, May 30, 2019, 11:32 AM Xiangrui Meng  wrote:

> Here is the draft announcement:
>
> ===
> Plan for dropping Python 2 support
>
> As many of you already knew, Python core development team and many
> utilized Python packages like Pandas and NumPy will drop Python 2 support
> in or before 2020/01/01. Apache Spark has supported both Python 2 and 3
> since Spark 1.4 release in 2015. However, maintaining Python 2/3
> compatibility is an increasing burden and it essentially limits the use of
> Python 3 features in Spark. Given the end of life (EOL) of Python 2 is
> coming, we plan to eventually drop Python 2 support as well. The current
> plan is as follows:
>
> * In the next major release in 2019, we will deprecate Python 2 support.
> PySpark users will see a deprecation warning if Python 2 is used. We will
> publish a migration guide for PySpark users to migrate to Python 3.
> * We will drop Python 2 support in a future release in 2020, after Python
> 2 EOL on 2020/01/01. PySpark users will see an error if Python 2 is used.
> * For releases that support Python 2, e.g., Spark 2.4, their patch
> releases will continue supporting Python 2. However, after Python 2 EOL, we
> might not take patches that are specific to Python 2.
> ===
>
> Sean helped make a pass. If it looks good, I'm going to upload it to Spark
> website and announce it here. Let me know if you think we should do a VOTE
> instead.
>
> On Thu, May 30, 2019 at 9:21 AM Xiangrui Meng  wrote:
>
>> I created https://issues.apache.org/jira/browse/SPARK-27884 to track the
>> work.
>>
>> On Thu, May 30, 2019 at 2:18 AM Felix Cheung 
>> wrote:
>>
>>> We don’t usually reference a future release on website
>>>
>>> > Spark website and state that Python 2 is deprecated in Spark 3.0
>>>
>>> I suspect people will then ask when is Spark 3.0 coming out then. Might
>>> need to provide some clarity on that.
>>>
>>
>> We can say the "next major release in 2019" instead of Spark 3.0. Spark
>> 3.0 timeline certainly requires a new thread to discuss.
>>
>>
>>>
>>>
>>> --
>>> *From:* Reynold Xin 
>>> *Sent:* Thursday, May 30, 2019 12:59:14 AM
>>> *To:* shane knapp
>>> *Cc:* Erik Erlandson; Mark Hamstra; Matei Zaharia; Sean Owen; Wenchen
>>> Fen; Xiangrui Meng; dev; user
>>> *Subject:* Re: Should python-2 be supported in Spark 3.0?
>>>
>>> +1 on Xiangrui’s plan.
>>>
>>> On Thu, May 30, 2019 at 7:55 AM shane knapp  wrote:
>>>
 I don't have a good sense of the overhead of continuing to support
> Python 2; is it large enough to consider dropping it in Spark 3.0?
>
> from the build/test side, it will actually be pretty easy to continue
 support for python2.7 for spark 2.x as the feature sets won't be expanding.

>>>
 that being said, i will be cracking a bottle of champagne when i can
 delete all of the ansible and anaconda configs for python2.x.  :)

>>>
>> On the development side, in a future release that drops Python 2 support
>> we can remove code that maintains python 2/3 compatibility and start using
>> python 3 only features, which is also quite exciting.
>>
>>
>>>
 shane
 --
 Shane Knapp
 UC Berkeley EECS Research / RISELab Staff Technical Lead
 https://rise.cs.berkeley.edu

>>>


Re: pySpark - pandas UDF and binaryType

2019-05-02 Thread Bryan Cutler
Hi,

BinaryType support was not added until Spark 2.4.0, see
https://issues.apache.org/jira/browse/SPARK-23555. Also, pyarrow 0.10.0 or
greater is require as you saw in the docs.

Bryan

On Thu, May 2, 2019 at 4:26 AM Nicolas Paris 
wrote:

> Hi all
>
> I am using pySpark 2.3.0 and pyArrow 0.10.0
>
> I want to apply a pandas-udf on a dataframe with 
> I have the bellow error:
>
> > Invalid returnType with grouped map Pandas UDFs:
> >
> StructType(List(StructField(filename,StringType,true),StructField(contents,BinaryType,true)))
> > is not supported
>
>
> I am missing something ?
> the doc
> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types
> says pyArrow 0.10 is minimum to handle binaryType
>
> here is the code:
>
> > from pyspark.sql.functions import pandas_udf, PandasUDFType
> >
> > df = sql("select filename, contents from test_binary")
> >
> > @pandas_udf("filename String, contents binary",
> PandasUDFType.GROUPED_MAP)
> > def transform_binary(pdf):
> > contents = pdf.contents
> > return pdf.assign(contents=contents)
> >
> > df.groupby("filename").apply(transform_binary).count()
>
> Thanks
> --
> nicolas
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark2.4 arrow enabled true,error log not returned

2019-01-10 Thread Bryan Cutler
Hi, could you please clarify if you are running a YARN cluster when you see
this problem?  I tried on Spark standalone and could not reproduce.  If
it's on a YARN cluster, please file a JIRA and I can try to investigate
further.

Thanks,
Bryan

On Sat, Dec 15, 2018 at 3:42 AM 李斌松  wrote:

> spark2.4 arrow enabled true,error log not returned,in spark 2.3,There's
> no such problem.
>
> 1、spark.sql.execution.arrow.enabled=true
> [image: image.png]
> *yarn log:*
>
> 18/12/15 14:35:52 INFO CodeGenerator: Code generated in 1030.698785 ms
>>
>> 18/12/15 14:35:54 INFO PythonRunner: Times: total = 1985, boot = 1892,
>>> init = 92, finish = 1
>>
>> 18/12/15 14:35:54 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0).
>>> 1799 bytes result sent to driver
>>
>> 18/12/15 14:35:55 INFO CoarseGrainedExecutorBackend: Got assigned task 1
>>
>> 18/12/15 14:35:55 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
>>
>> 18/12/15 14:35:55 INFO TorrentBroadcast: Started reading broadcast
>>> variable 1
>>
>> 18/12/15 14:35:55 INFO MemoryStore: Block broadcast_1_piece0 stored as
>>> bytes in memory (estimated size 8.3 KB, free 1048.8 MB)
>>
>> 18/12/15 14:35:55 INFO TorrentBroadcast: Reading broadcast variable 1
>>> took 18 ms
>>
>> 18/12/15 14:35:55 INFO MemoryStore: Block broadcast_1 stored as values in
>>> memory (estimated size 14.0 KB, free 1048.8 MB)
>>
>> 18/12/15 14:35:55 INFO CodeGenerator: Code generated in 30.269745 ms
>>
>> 18/12/15 14:35:55 INFO PythonRunner: Times: total = 13, boot = 5, init =
>>> 7, finish = 1
>>
>> 18/12/15 14:35:55 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1).
>>> 1893 bytes result sent to driver
>>
>> 18/12/15 14:35:55 INFO CoarseGrainedExecutorBackend: Got assigned task 2
>>
>> 18/12/15 14:35:55 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
>>
>> 18/12/15 14:35:55 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID
>>> 2)
>>
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>>> last):
>>
>>   File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/worker.py", line
>>> 377, in main
>>
>> process()
>>
>>   File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/worker.py", line
>>> 372, in process
>>
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>
>>   File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/serializers.py",
>>> line 390, in dump_stream
>>
>> vs = list(itertools.islice(iterator, batch))
>>
>>   File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/util.py", line 99,
>>> in wrapper
>>
>> return f(*args, **kwargs)
>>
>>   File
>>> "/yarn/nm/usercache/admin/appcache/application_1544579748138_0215/container_e43_1544579748138_0215_01_01/python1.py",
>>> line 435, in mapfunc
>>
>> ValueError: could not convert string to float: 'a'
>>
>>
>>> at
>>> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
>>
>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
>>
>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
>>
>> at
>>> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
>>
>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>>
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>
>> at
>>> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:99)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>
>> at
>>> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
>>
>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>>
>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>>
>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>>
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>>
>> at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.to
>>> (ArrowConverters.scala:97)
>>
>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>>
>> at
>>> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toBuffer(ArrowConverters.scala:97)
>>
>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>>
>> at
>>> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toArray(ArrowConverters.scala:97)
>>
>> at
>>> org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17$$anonfun$apply$18.apply(Dataset.scala:3314)
>>
>> at
>>> org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17$$anonfun$apply$18.apply(Dataset.scala:3314)
>>
>> at 

Re: Use Arrow instead of Pickle without pandas_udf

2018-07-30 Thread Bryan Cutler
Here is a link to the JIRA for adding StructType support for scalar
pandas_udf https://issues.apache.org/jira/browse/SPARK-24579


On Wed, Jul 25, 2018 at 3:36 PM, Hichame El Khalfi 
wrote:

> Hey Holden,
> Thanks for your reply,
>
> We currently using a python function that produces a Row(TS=LongType(),
> bin=BinaryType()).
> We use this function like this dataframe.rdd.map(my_function)
> .toDF().write.parquet()
>
> To reuse it in pandas_udf, we changes the return type to
> StructType(StructField(Long), StructField(BinaryType).
>
> 1)But we face an issue that StructType is not supported by pandas_udf.
>
> So I was wondering to still continue to reuse dataftame.rdd.map but get an
> improvement in serialization by using ArrowFormat instead of Pickle.
>
> *From:* hol...@pigscanfly.ca
> *Sent:* July 25, 2018 4:41 PM
> *To:* hich...@elkhalfi.com
> *Cc:* user@spark.apache.org
> *Subject:* Re: Use Arrow instead of Pickle without pandas_udf
>
> Not currently. What's the problem with pandas_udf for your use case?
>
> On Wed, Jul 25, 2018 at 1:27 PM, Hichame El Khalfi 
> wrote:
>
>> Hi There,
>>
>>
>> Is there a way to use Arrow format instead of Pickle but without using
>> pandas_udf ?
>>
>>
>> Thank for your help,
>>
>>
>> Hichame
>>
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: Arrow type issue with Pandas UDF

2018-07-19 Thread Bryan Cutler
Hi Patrick,

It looks like it's failing in Scala before it even gets to Python to
execute your udf, which is why it doesn't seem to matter what's in your
udf. Since you are doing a grouped map udf maybe your group sizes are too
big or skewed? Could you try to reduce the size of your groups by adding
more keys or sampling a fraction of the data? If the problem persists could
you make a jira? At the very least a better exception would be nice.

Bryan

On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy
 wrote:

> PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.
>
> I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions
> in the last stage of the job regardless of my output type.
>
>
> The problem I'm trying to solve:
> I have a column of scalar values, and each value on the same row has a
> sorted vector. I'm trying to replace each scalar value with its closest
> index from its vector. I'm applying the grouping arbitrarily and performing
> a python operation row-wise because even when the same vector appears on
> many rows it's not clear how I would get the lookup to scale.
>
> My input data, the product of a join of hive tables, has the following
> schema:
>
> root
>  |-- scalar_value: float (nullable = true)
>  |-- quantilelist: array (nullable = true)
>  ||-- element: double (containsNull = true)
>
>
> My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
> an operation on two columns, and because I want to take advantage of Arrow
> to avoid serialization.
>
> The schema my UDF returns is this:
>
> pos_schema = T.StructType([
> T.StructField('feature_value',T.FloatType(),True),
> T.StructField('error',T.StringType())
> ])
>
> ...however when I try to apply my UDF, either with saveAsTable or show(),
> I get the following exception:
>
> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
> expand the buffer
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
> at
> org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
> at
> org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
> at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> at
> org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
> at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> at
> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
>
> I assumed it was the result of some bad typing on my part, until I did a
> test with a degenerate UDF that only returns a column of 1:
>
> @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
>
> F.PandasUDFType.GROUPED_MAP)
>
> def groupedPercentileInt(df):
>
> return
> pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)
>
>
> This clearly only has one return value of type int, yet I get the same
> exception:
>
> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
> expand the buffer
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
> at
> org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
> at
> org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
> at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> at
> org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
> 

Re: Pandas UDF for PySpark error. Big Dataset

2018-05-29 Thread Bryan Cutler
Can you share some of the code used, or at least the pandas_udf plus the
stacktrace?  Also does decreasing your dataset size fix the oom?

On Mon, May 28, 2018, 4:22 PM Traku traku  wrote:

> Hi.
>
> I'm trying to use the new feature but I can't use it with a big dataset
> (about 5 million rows).
>
> I tried  increasing executor memory, driver memory, partition number, but
> any solution can help me to solve the problem.
>
> One of the executor task increase the shufle memory until fails.
>
> Error is arrow generated: unable to expand the buffer.
>
> Any idea?
>


Re: OneHotEncoderEstimator - java.lang.NoSuchMethodError: org.apache.spark.sql.Dataset.withColumns

2018-05-18 Thread Bryan Cutler
The example works for me, please check your environment and ensure you are
using Spark 2.3.0 where OneHotEncoderEstimator was introduced.

On Fri, May 18, 2018 at 12:57 AM, Matteo Cossu  wrote:

> Hi,
>
> are you sure Dataset has a method withColumns?
>
> On 15 May 2018 at 16:58, Mina Aslani  wrote:
>
>> Hi,
>>
>> I get below error when I try to run oneHotEncoderEstimator example.
>> https://github.com/apache/spark/blob/b74366481cc87490adf4e69
>> d26389ec737548c15/examples/src/main/java/org/apache/
>> spark/examples/ml/JavaOneHotEncoderEstimatorExample.java#L67
>>
>> Which is this line of the code:
>> https://github.com/apache/spark/blob/master/mllib/src/main/
>> scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala#L348
>>
>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: 
>> org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
>>  at 
>> org.apache.spark.ml.feature.OneHotEncoderModel.transform(OneHotEncoderEstimator.scala:348)
>>
>>
>> Can you please let me know, what is the cause? Any workaround?
>>
>> Seeing the example in the repo, looks like that at some point it used be
>> running fine. And, now it's not working. Also, oneHotEncoder is deprecated.
>>
>> I really appreciate your quick response.
>>
>> Regards,
>> Mina
>>
>>
>


Re: How to use StringIndexer for multiple input /output columns in Spark Java

2018-05-16 Thread Bryan Cutler
Yes, the workaround is to create multiple StringIndexers as you described.
OneHotEncoderEstimator is only in Spark 2.3.0, you will have to use just
OneHotEncoder.

On Tue, May 15, 2018, 8:40 AM Mina Aslani  wrote:

> Hi,
>
> So, what is the workaround? Should I create multiple indexer(one for each
> column), and then create pipeline and set stages to have all the
> StringIndexers?
> I am using 2.2.1 as I cannot move to 2.3.0. Looks like
> oneHotEncoderEstimator is broken, please see my email sent today with
> subject:
> OneHotEncoderEstimator - java.lang.NoSuchMethodError: org.apache.spark.sql
> .Dataset.withColumns
>
> Regards,
> Mina
>
> On Tue, May 15, 2018 at 2:37 AM, Nick Pentreath 
> wrote:
>
>> Multi column support for StringIndexer didn’t make it into Spark 2.3.0
>>
>> The PR is still in progress I think - should be available in 2.4.0
>>
>> On Mon, 14 May 2018 at 22:32, Mina Aslani  wrote:
>>
>>> Please take a look at the api doc:
>>> https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
>>>
>>> On Mon, May 14, 2018 at 4:30 PM, Mina Aslani 
>>> wrote:
>>>
 Hi,

 There is no SetInputCols/SetOutputCols for StringIndexer in Spark java.
 How multiple input/output columns can be specified then?

 Regards,
 Mina

>>>
>>>
>


Re: [Arrow][Dremio]

2018-05-15 Thread Bryan Cutler
Hi Xavier,

Regarding Arrow usage in Spark, using Arrow format to transfer data between
Python and Java has been the focus so far because this area stood to
benefit the most.  It's possible that the scope of Arrow could broaden in
the future, but there still needs to be discussions about this.

Bryan

On Mon, May 14, 2018 at 9:55 AM, Pierce Lamb 
wrote:

> Hi Xavier,
>
> Along the lines of connecting to multiple sources of data and replacing
> ETL tools you may want to check out Confluent's blog on building a
> real-time streaming ETL pipeline on Kafka
> 
> as well as SnappyData's blog on Real-Time Streaming ETL with SnappyData
>  where
> Spark is central to connecting to multiple data sources, executing SQL on
> streams etc. These should provide nice comparisons to your ideas about
> Dremio + Spark as ETL tools.
>
> Disclaimer: I am a SnappyData employee
>
> Hope this helps,
>
> Pierce
>
> On Mon, May 14, 2018 at 2:24 AM, xmehaut  wrote:
>
>> Hi Michaël,
>>
>> I'm not an expert of Dremio, i just try to evaluate the potential of this
>> techno and what impacts it could have on spark, and how they can work
>> together, or how spark could use even further arrow internally along the
>> existing algorithms.
>>
>> Dremio has already a quite rich api set enabling to access for instance to
>> metadata, sql queries, or even to create virtual datasets
>> programmatically.
>> They also have a lot of predefined functions, and I imagine there will be
>> more an more fucntions in the future, eg machine learning functions like
>> the
>> ones we may find in azure sql server which enables to mix sql and ml
>> functions.  Acces to dremio is made through jdbc, and we may imagine to
>> access virtual datasets through spark and create dynamically new datasets
>> from the api connected to parquets files stored dynamycally by spark on
>> hdfs, azure datalake or s3... Of course a more thight integration between
>> both should be better with a spark read/write connector to dremio :)
>>
>> regards
>> xavier
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Spark dataset to byte array over grpc

2018-04-23 Thread Bryan Cutler
Hi Ashwin,

This sounds like it might be a good use for Apache Arrow, if you are open
to the type of format to exchange.  As of Spark 2.3, Dataset has a method
"toArrowPayload" that will convert a Dataset of Rows to a byte array in
Arrow format, although the API is currently not public.  Your client could
consume Arrow data directly or perhaps use spark.sql ColumnarBatch to read
back as Rows.

Bryan

On Mon, Apr 23, 2018 at 11:49 AM, Ashwin Sai Shankar <
ashan...@netflix.com.invalid> wrote:

> Hi!
> I'm building a spark app which runs a spark-sql query and send results to
> client over grpc(my proto file is configured to send the sql output as
> "bytes"). The client then displays the output rows. When I run spark.sql, I
> get a DataSet. How do I convert this to byte array?
> Also is there a better way to send this output to client?
>
> Thanks,
> Ashwin
>
>


Re: PySpark ML: Get best set of parameters from TrainValidationSplit

2018-04-16 Thread Bryan Cutler
Hi Aakash,

First you will want to get the the random forest model stage from the best
pipeline model result, for example if RF is the first stage:

rfModel = model.bestModel.stages[0]

Then you can check the values of the params you tuned like this:

rfModel.getNumTrees

On Mon, Apr 16, 2018 at 7:52 AM, Aakash Basu 
wrote:

> Hi,
>
> I am running a Random Forest model on a dataset using hyper parameter
> tuning with Spark's paramGrid and Train Validation Split.
>
> Can anyone tell me how to get the best set for all the four parameters?
>
> I used:
>
> model.bestModel()
> model.metrics()
>
>
> But none of them seem to work.
>
>
> Below is the code chunk:
>
> paramGrid = ParamGridBuilder() \
> .addGrid(rf.numTrees, [50, 100, 150, 200]) \
> .addGrid(rf.maxDepth, [5, 10, 15, 20]) \
> .addGrid(rf.minInfoGain, [0.001, 0.01, 0.1, 0.6]) \
> .addGrid(rf.minInstancesPerNode, [5, 15, 30, 50, 100]) \
> .build()
>
> tvs = TrainValidationSplit(estimator=pipeline,
>estimatorParamMaps=paramGrid,
>evaluator=MulticlassClassificationEvaluator(),
># 80% of the data will be used for training, 20% 
> for validation.
>trainRatio=0.8)
>
> model = tvs.fit(trainingData)
>
> predictions = model.transform(testData)
>
> evaluator = MulticlassClassificationEvaluator(
> labelCol="label", predictionCol="prediction", metricName="accuracy")
> accuracy = evaluator.evaluate(predictions)
> print("Accuracy = %g" % accuracy)
> print("Test Error = %g" % (1.0 - accuracy))
>
>
> Any help?
>
>
> Thanks,
> Aakash.
>


Re: is there a way of register python UDF using java API?

2018-04-02 Thread Bryan Cutler
Hi Kant,

The udfDeterministic would be set to false if the results from your UDF are
non-deterministic, such as produced by random numbers, so the catalyst
optimizer will not cache and reuse results.

On Mon, Apr 2, 2018 at 12:11 PM, kant kodali  wrote:

> Looks like there is spark.udf().registerPython() like below.
>
> public void registerPython(java.lang.String name, org.apache.spark.sql.
> execution.python.UserDefinedPythonFunction udf)
>
>
> can anyone describe what *udfDeterministic *parameter does in the method
> signature below?
>
> public UserDefinedPythonFunction(java.lang.String name, 
> org.apache.spark.api.python.PythonFunction func, 
> org.apache.spark.sql.types.DataType dataType, int pythonEvalType, boolean 
> udfDeterministic) { /* compiled code */ }
>
>
> On Sun, Apr 1, 2018 at 3:46 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> All of our spark code is in Java wondering if there a way to register
>> python UDF's using java API such that the registered UDF's can be used
>> using raw spark SQL.
>> If there is any other way to achieve this goal please suggest!
>>
>> Thanks
>>
>>
>


Re: PySpark Tweedie GLM

2018-02-09 Thread Bryan Cutler
Can you provide some code/data to reproduce the problem?

On Fri, Feb 9, 2018 at 9:42 AM, nhamwey 
wrote:

> I am using Spark 2.2.0 through Python.
>
> I am repeatedly getting a zero weight of sums error when trying to run a
> model. This happens even when I do not specify a defined weightCol =
> "variable"
>
> Py4JJavaError: An error occurred while calling o1295.fit.
> : java.lang.AssertionError: assertion failed: Sum of weights cannot be
> zero.
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.ml.optim.WeightedLeastSquares$Aggregator.validate(
> WeightedLeastSquares.scala:418)
> at
> org.apache.spark.ml.optim.WeightedLeastSquares.fit(
> WeightedLeastSquares.scala:101)
> at
> org.apache.spark.ml.optim.IterativelyReweightedLeastSquares.fit(
> IterativelyReweightedLeastSquares.scala:86)
> at
> org.apache.spark.ml.regression.GeneralizedLinearRegression.train(
> GeneralizedLinearRegression.scala:369)
> at
> org.apache.spark.ml.regression.GeneralizedLinearRegression.train(
> GeneralizedLinearRegression.scala:203)
> at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(
> ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
> java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-02-08 Thread Bryan Cutler
Nicolas, are you referring to printing the model params in that example
with "print(model1.extractParamMap())"?  There was a problem with pyspark
models not having params after being fit, which causes this example to show
nothing for model paramMaps.  This was fixed in
https://issues.apache.org/jira/browse/SPARK-10931 and the example now shows
all model params.  The fix will be in the Spark 2.3 release.

Bryan

On Wed, Jan 31, 2018 at 10:20 PM, Nicolas Paris <nipari...@gmail.com> wrote:

> Hey
>
> I am also interested in how to get those parameters.
> For example, the demo code spark-2.2.1-bin-hadoop2.7/
> examples/src/main/python/ml/estimator_transformer_param_example.py
> return empty parameters when  printing "lr.extractParamMap()"
>
> That's weird
>
> Thanks
>
> Le 30 janv. 2018 à 23:10, Bryan Cutler écrivait :
> > Hi Michelle,
> >
> > Your original usage of ParamGridBuilder was not quite right, `addGrid`
> expects
> > (some parameter, array of values for that parameter).  If you want to do
> a grid
> > search with different regularization values, you would do the following:
> >
> > val paramMaps = new ParamGridBuilder().addGrid(logist.regParam,
> Array(0.1,
> > 0.3)).build()
> >
> > * don't forget to build the grid after adding values
> >
> > On Tue, Jan 30, 2018 at 6:55 AM, michelleyang <
> michelle1026sh...@gmail.com>
> > wrote:
> >
> > I tried to use One vs Rest in spark ml with pipeline and
> crossValidator for
> > multimultinomial in logistic regression.
> >
> > It came out with empty coefficients. I figured out it was the
> setting of
> > ParamGridBuilder. Can anyone help me understand how does the
> parameter
> > setting affect the crossValidator process?
> >
> > the orginal code: //output empty coefficients.
> >
> > val logist=new LogisticRegression
> >
> > val ova = new OneVsRest().setClassifier(logist)
> >
> > val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> > Array(logist.getRegParam))
> >
> > New code://output multi classes coefficients
> >
> > val logist=new LogisticRegression
> >
> > val ova = new OneVsRest().setClassifier(logist)
> >
> > val classifier1 = new LogisticRegression().setRegParam(2.0)
> >
> > val classifier2 = new LogisticRegression().setRegParam(3.0)
> >
> > val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> > Array(classifier1, classifier2))
> >
> > Please help Thanks.
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > 
> -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
>


Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-01-30 Thread Bryan Cutler
Hi Michelle,

Your original usage of ParamGridBuilder was not quite right, `addGrid`
expects (some parameter, array of values for that parameter).  If you want
to do a grid search with different regularization values, you would do the
following:

val paramMaps = new ParamGridBuilder().addGrid(logist.regParam, Array(0.1,
0.3)).build()

* don't forget to build the grid after adding values

On Tue, Jan 30, 2018 at 6:55 AM, michelleyang 
wrote:

> I tried to use One vs Rest in spark ml with pipeline and crossValidator for
> multimultinomial in logistic regression.
>
> It came out with empty coefficients. I figured out it was the setting of
> ParamGridBuilder. Can anyone help me understand how does the parameter
> setting affect the crossValidator process?
>
> the orginal code: //output empty coefficients.
>
> val logist=new LogisticRegression
>
> val ova = new OneVsRest().setClassifier(logist)
>
> val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> Array(logist.getRegParam))
>
> New code://output multi classes coefficients
>
> val logist=new LogisticRegression
>
> val ova = new OneVsRest().setClassifier(logist)
>
> val classifier1 = new LogisticRegression().setRegParam(2.0)
>
> val classifier2 = new LogisticRegression().setRegParam(3.0)
>
> val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> Array(classifier1, classifier2))
>
> Please help Thanks.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Timestamp changing while writing

2018-01-15 Thread Bryan Cutler
Spark internally stores timestamps as UTC values, so cearteDataFrame will
covert from local time zone to UTC. I think there was a Jira to correct
parquet output. Are the values you are seeing offset from your local time
zone?

On Jan 11, 2018 4:49 PM, "sk skk"  wrote:

> Hello,
>
> I am using createDataframe and passing java row rdd and schema . But it is
> changing the time value when I write that data frame to a parquet file.
>
> Can any one help .
>
> Thank you,
> Sudhir
>


Re: Apache Spark: Parallelization of Multiple Machine Learning ALgorithm

2017-09-05 Thread Bryan Cutler
Hi Prem,

Spark actually does somewhat support different algorithms in
CrossValidator, but it's not really obvious.  You basically need to make a
Pipeline and build a ParamGrid with different algorithms as stages.  Here
is an simple example:

val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")

val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")

val pipeline = new Pipeline()

val paramGrid = new ParamGridBuilder()
  .addGrid(pipeline.stages, Array(Array[PipelineStage](dt),
Array[PipelineStage](lr)))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)

Although adding more params in the grid can get a little complicated - I
discuss in detail here https://bryancutler.github.io/cv-pipelines/
As Patrick McCarthy mentioned, you might want to follow SPARK-19071 ,
specifically https://issues.apache.org/jira/browse/SPARK-19357 which
parallelizes model evaluation.

Bryan

On Tue, Sep 5, 2017 at 8:02 AM, Yanbo Liang  wrote:

> You are right, native Spark MLlib CrossValidation can't run *different 
> *algorithms
> in parallel.
>
> Thanks
> Yanbo
>
> On Tue, Sep 5, 2017 at 10:56 PM, Timsina, Prem 
> wrote:
>
>> Hi Yanboo,
>>
>> Thank You, I very much appreciate your help.
>>
>> For the current use case, the data can fit into a single node. So,
>> spark-sklearn seems to be good choice.
>>
>>
>>
>> *I have  on question regarding this *
>>
>> *“If no, Spark MLlib provide CrossValidation which can run multiple
>> machine learning algorithms parallel on distributed dataset and do
>> parameter search.
>> FYI: https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation
>> ”*
>>
>> If I understand correctly, it can run parameter search for
>> cross-validation in parallel.
>>
>> However,  currently  Spark does not support  running multiple algorithms
>> (like Naïve Bayes,  Random Forest, etc.) in parallel. Am I correct?
>>
>> If not, could you please point me to some resources where they have run
>> multiple algorithms in parallel.
>>
>>
>>
>> Thank You very much. It is great help, I will try spark-sklearn.
>>
>> Prem
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From: *Yanbo Liang 
>> *Date: *Tuesday, September 5, 2017 at 10:40 AM
>> *To: *Patrick McCarthy 
>> *Cc: *"Timsina, Prem" , "user@spark.apache.org" <
>> user@spark.apache.org>
>> *Subject: *Re: Apache Spark: Parallelization of Multiple Machine
>> Learning ALgorithm
>>
>>
>>
>> Hi Prem,
>>
>>
>>
>> How large is your dataset? Can it be fitted in a single node?
>>
>> If no, Spark MLlib provide CrossValidation which can run multiple machine
>> learning algorithms parallel on distributed dataset and do parameter
>> search. FYI: https://spark.apache.org/docs/latest/ml-tuning.html#cro
>> ss-validation
>> 
>>
>> If yes, you can also try spark-sklearn, which can distribute multiple
>> model training(single node training with sklearn) across a distributed
>> cluster and do parameter search. FYI: https://github.com/databr
>> icks/spark-sklearn
>> 
>>
>>
>>
>> Thanks
>>
>> Yanbo
>>
>>
>>
>> On Tue, Sep 5, 2017 at 9:56 PM, Patrick McCarthy 
>> wrote:
>>
>> You might benefit from watching this JIRA issue -
>> https://issues.apache.org/jira/browse/SPARK-19071
>> 
>>
>>
>>
>> On Sun, Sep 3, 2017 at 5:50 PM, Timsina, Prem 
>> wrote:
>>
>> Is there a way to parallelize multiple ML algorithms in Spark. My use
>> case is something like this:
>>
>> A) Run multiple machine learning algorithm (Naive Bayes, ANN, Random
>> Forest, etc.) in parallel.
>>
>> 1) Validate each algorithm using 10-fold cross-validation
>>
>> B) Feed the output of step A) in second layer machine learning algorithm.
>>
>> My question is:
>>
>> Can we run multiple machine 

Re: Crossvalidator after fit

2017-05-05 Thread Bryan Cutler
Looks like there might be a problem with the way you specified your
parameter values, probably you have an integer value where it should be a
floating-point.  Double check that and if there is still a problem please
share the rest of your code so we can see how you defined "gridS".

On Fri, May 5, 2017 at 7:40 AM, issues solution 
wrote:

> Hi get the following error after trying to perform
> gridsearch and crossvalidation on randomforst estimator for classificaiton
>
> rf = RandomForestClassifier(labelCol="Labeld",featuresCol="features")
>
> evaluator =  BinaryClassificationEvaluator(metricName="F1 Score")
>
> rf_cv = CrossValidator(estimator=rf, 
> estimatorParamMaps=gridS,evaluator=evaluator,numFolds=5)
> (trainingData, testData) = transformed13.randomSplit([0.7, 0.3])
> rfmodel  =  rf_cv.fit(trainingData)
> ---Py4JJavaError
>  Traceback (most recent call 
> last) in ()> 1 rfmodel  =  
> rf_cv.fit(trainingData)
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params) 67 return 
> self.copy(params)._fit(dataset) 68 else:---> 69   
>   return self._fit(dataset) 70 else: 71 raise 
> ValueError("Params must be either a param map or a list/tuple of param maps, "
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/tuning.py in _fit(self, 
> dataset)237 train = df.filter(~condition)238 
> for j in range(numModels):--> 239 model = est.fit(train, 
> epm[j])240 # TODO: duplicate evaluator to take extra 
> params from input241 metric = 
> eva.evaluate(model.transform(validation, epm[j]))
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params) 65 elif isinstance(params, dict): 
> 66 if params:---> 67 return 
> self.copy(params)._fit(dataset) 68 else: 69   
>   return self._fit(dataset)
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/wrapper.py in 
> _fit(self, dataset)131 132 def _fit(self, dataset):--> 133
>  java_model = self._fit_java(dataset)134 return 
> self._create_model(java_model)135
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/wrapper.py in 
> _fit_java(self, dataset)127 :return: fitted Java model128 
> """--> 129 self._transfer_params_to_java()130 return 
> self._java_obj.fit(dataset._jdf)131
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/wrapper.py in 
> _transfer_params_to_java(self) 80 for param in self.params: 
> 81 if param in paramMap:---> 82 pair = 
> self._make_java_param_pair(param, paramMap[param]) 83 
> self._java_obj.set(pair) 84
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/wrapper.py in 
> _make_java_param_pair(self, param, value) 71 java_param = 
> self._java_obj.getParam(param.name) 72 java_value = _py2java(sc, 
> value)---> 73 return java_param.w(java_value) 74  75 def 
> _transfer_params_to_java(self):
> /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)811 answer = 
> self.gateway_client.send_command(command)812 return_value = 
> get_return_value(--> 813 answer, self.gateway_client, 
> self.target_id, self.name)814 815 for temp_arg in temp_args:
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/utils.py in deco(*a, 
> **kw) 43 def deco(*a, **kw): 44 try:---> 45 
> return f(*a, **kw) 46 except py4j.protocol.Py4JJavaError as e:
>  47 s = e.java_exception.toString()
> /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)306  
>raise Py4JJavaError(307 "An error occurred 
> while calling {0}{1}{2}.\n".--> 308 format(target_id, 
> ".", name), value)309 else:310 raise 
> Py4JError(
> Py4JJavaError: An error occurred while calling o91602.w.
> : java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.Double
>   at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
>   at org.apache.spark.ml.param.DoubleParam.w(params.scala:225)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 

Re: pandas DF Dstream to Spark DF

2017-04-10 Thread Bryan Cutler
Hi Yogesh,

It would be easier to help if you included your code and the exact error
messages that occur.  If you are creating a Spark DataFrame with a Pandas
DataFrame, then Spark does not read the schema and infers from the data to
make one.  This might be the cause of your issue if the schema is not
inferred correctly.  You can try to specify the schema manually, like this
for example

schema = StructType([
StructField("str_t", StringType(), True),
StructField("int_t", IntegerType(), True),
StructField("double_t", DoubleType(), True)])

pandas_df = pandas.DataFrame(data={...})
spark_df = spark.createDataFrame(pandas_df, schema=schema)

This step might be eliminated by using Apache Arrow, see SPARK-13534 for
related work.

On Sun, Apr 9, 2017 at 10:19 PM, Yogesh Vyas  wrote:

> Hi,
>
> I am writing a pyspark streaming job in which i am returning a pandas data
> frame as DStream. Now I wanted to save this DStream dataframe to parquet
> file. How to do that?
>
> I am trying to convert it to spark data frame but I am getting multiple
> errors. Please suggest me how to do that.
>
> Regards,
> Yogesh
>


Re: Belief propagation algorithm is open sourced

2016-12-14 Thread Bryan Cutler
I'll check it out, thanks for sharing Alexander!

On Dec 13, 2016 4:58 PM, "Ulanov, Alexander" 
wrote:

> Dear Spark developers and users,
>
>
> HPE has open sourced the implementation of the belief propagation (BP)
> algorithm for Apache Spark, a popular message passing algorithm for
> performing inference in probabilistic graphical models. It provides exact
> inference for graphical models without loops. While inference for graphical
> models with loops is approximate, in practice it is shown to work well. The
> implementation is generic and operates on factor graph representation of
> graphical models. It handles factors of any order, and variable domains of
> any size. It is implemented with Apache Spark GraphX, and thus can scale to
> large scale models. Further, it supports computations in log scale for
> numerical stability. Large scale applications of BP include fraud detection
> in banking transactions and malicious site detection in computer networks.
>
>
> Source code: https://github.com/HewlettPackard/sandpiper
>
>
> Best regards, Alexander
>


Re: New to spark.

2016-09-28 Thread Bryan Cutler
Hi Anirudh,

All types of contributions are welcome, from code to documentation.  Please
check out the page at
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark for
some info, specifically keep a watch out for starter JIRAs here
https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20labels%20%3D%20Starter%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)
.

On Wed, Sep 28, 2016 at 9:11 AM, Anirudh Muhnot  wrote:

> Hello everyone, I'm Anirudh. I'm fairly new to spark as I've done an
> online specialisation from UC Berkeley. I know how to code in Python but
> have little to no idea about Scala. I want to contribute to Spark, Where do
> I start and how? I'm reading the pull requests at Git Hub but I'm barley
> able to understand them. Can anyone help? Thank you.
> Sent from my iPhone
>


Re: Master OOM in "master-rebuild-ui-thread" while running stream app

2016-09-13 Thread Bryan Cutler
It looks like you have logging enabled and your application event log is
too large for the master to build a web UI from it.  In spark 1.6.2 and
earlier, when an application completes, the master rebuilds a web UI to
view events after the fact.  This functionality was removed in spark 2.0
and the history server should be used instead.  If you are unable to
upgrade could you try disabling logging?

On Sep 13, 2016 7:18 AM, "Mariano Semelman" 
wrote:

> Hello everybody,
>
> I am running a spark streaming app and I am planning to use it as a long
> running service. However while trying the app in a rc environment I got
> this exception in the master daemon after 1 hour of running:
>
> ​​Exception in thread "master-rebuild-ui-thread"
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at java.util.regex.Pattern.compile(Pattern.java:1667)
> at java.util.regex.Pattern.(Pattern.java:1351)
> at java.util.regex.Pattern.compile(Pattern.java:1054)
> at java.lang.String.replace(String.java:2239)
> at org.apache.spark.util.Utils$.getFormattedClassName(Utils.
> scala:1632)
> at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(
> JsonProtocol.scala:486)
> at org.apache.spark.scheduler.ReplayListenerBus.replay(
> ReplayListenerBus.scala:58)
> at org.apache.spark.deploy.master.Master$$anonfun$17.
> apply(Master.scala:972)
> at org.apache.spark.deploy.master.Master$$anonfun$17.
> apply(Master.scala:952)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> As a palliative measure I've increased the master memory to 1.5gb.
> My job is running with a batch interval of 5 seconds.
> I'm using spark version 1.6.2.
>
> I think it might be related to this issues:
>
> https://issues.apache.org/jira/browse/SPARK-6270
> https://issues.apache.org/jira/browse/SPARK-12062
> https://issues.apache.org/jira/browse/SPARK-12299
>
> But I don't see a clear road to solve this apart from upgrading spark.
> What would you recommend?
>
>
> Thanks in advance
> Mariano
>
>


Re: Random Forest Classification

2016-08-31 Thread Bryan Cutler
I see.  You might try this, create a pipeline of just your feature
transformers, then call fit() on the complete dataset to get a model.
Finally make second pipeline and add this model and the decision tree as
stages.

On Aug 30, 2016 8:19 PM, "Bahubali Jain" <bahub...@gmail.com> wrote:

> Hi Bryan,
> Thanks for the reply.
> I am indexing 5 columns ,then using these indexed columns to generate the
> "feature" column thru vector assembler.
> Which essentially means that I cannot use *fit()* directly on
> "completeDataset" dataframe since it will neither have the "feature" column
> and nor the 5 indexed columns.
> Of-course there is a dirty way of doing this, but I am wondering if there
> some optimized/intelligent approach for this.
>
> Thanks,
> Baahu
>
> On Wed, Aug 31, 2016 at 3:30 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> You need to first fit just the VectorIndexer which returns the model,
>> then add the model to the pipeline where it will only transform.
>>
>> val featureVectorIndexer = new VectorIndexer()
>> .setInputCol("feature")
>> .setOutputCol("indexedfeature")
>> .setMaxCategories(180)
>> .fit(completeDataset)
>>
>> On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <bahub...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I had run into similar exception " java.util.NoSuchElementException:
>>> key not found: " .
>>> After further investigation I realized it is happening due to
>>> vectorindexer being executed on training dataset and not on entire dataset.
>>>
>>> In the dataframe I have 5 categories , each of these have to go thru
>>> stringindexer and then these are put thru a vector indexer to generate
>>> feature vector.
>>> What is the right way to do this, so that vector indexer can be run on
>>> the entire data and not just on training data?
>>>
>>> Below is the current approach, as evident  VectorIndexer is being
>>> generated based on the training set.
>>>
>>> Please Note: fit() on Vectorindexer cannot be called on entireset
>>> dataframe since it doesn't have the required column(*feature *column is
>>> being generated dynamically in pipeline execution)
>>> How can the vectorindexer be *fit()* on the entireset?
>>>
>>>  val col1_indexer = new StringIndexer().setInputCol("c
>>> ol1").setOutputCol("indexed_col1")
>>> val col2_indexer = new StringIndexer().setInputCol("c
>>> ol2").setOutputCol("indexed_col2")
>>> val col3_indexer = new StringIndexer().setInputCol("c
>>> ol3").setOutputCol("indexed_col3")
>>> val col4_indexer = new StringIndexer().setInputCol("c
>>> ol4").setOutputCol("indexed_col4")
>>> val col5_indexer = new StringIndexer().setInputCol("c
>>> ol5").setOutputCol("indexed_col5")
>>>
>>> val featureArray =  Array("indexed_col1","indexed_
>>> col2","indexed_col3","indexed_col4","indexed_col5")
>>> val vectorAssembler = new VectorAssembler().setInputCols
>>> (featureArray).setOutputCol("*feature*")
>>> val featureVectorIndexer = new VectorIndexer()
>>> .setInputCol("feature")
>>> .setOutputCol("indexedfeature")
>>> .setMaxCategories(180)
>>>
>>> val decisionTree = new DecisionTreeClassifier().setMa
>>> xBins(300).setMaxDepth(1).setImpurity("entropy").setLabelCol
>>> ("indexed_user_action").setFeaturesCol("indexedfeature").
>>> setPredictionCol("prediction")
>>>
>>> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
>>> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,featureVecto
>>> rIndexer,decisionTree))
>>> val model = pipeline.*fit(trainingSet)*
>>> val output = model.transform(cvSet)
>>>
>>>
>>> Thanks,
>>> Baahu
>>>
>>> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>>>
>>>> Hi Rich,
>>>>
>>>> I looked at the notebook and it seems like you are fitting the
>>>> StringIndexer and VectorIndexer to only the training data, and it should
>>>> the the entire data set.  So if the training data does not include all of
>>>> the labels and an unknown label appears in the test data during evaluation,
&g

Re: Random Forest Classification

2016-08-30 Thread Bryan Cutler
You need to first fit just the VectorIndexer which returns the model, then
add the model to the pipeline where it will only transform.

val featureVectorIndexer = new VectorIndexer()
.setInputCol("feature")
.setOutputCol("indexedfeature")
.setMaxCategories(180)
.fit(completeDataset)

On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <bahub...@gmail.com> wrote:

> Hi,
> I had run into similar exception " java.util.NoSuchElementException: key
> not found: " .
> After further investigation I realized it is happening due to
> vectorindexer being executed on training dataset and not on entire dataset.
>
> In the dataframe I have 5 categories , each of these have to go thru
> stringindexer and then these are put thru a vector indexer to generate
> feature vector.
> What is the right way to do this, so that vector indexer can be run on the
> entire data and not just on training data?
>
> Below is the current approach, as evident  VectorIndexer is being
> generated based on the training set.
>
> Please Note: fit() on Vectorindexer cannot be called on entireset
> dataframe since it doesn't have the required column(*feature *column is
> being generated dynamically in pipeline execution)
> How can the vectorindexer be *fit()* on the entireset?
>
>  val col1_indexer = new StringIndexer().setInputCol("
> col1").setOutputCol("indexed_col1")
> val col2_indexer = new StringIndexer().setInputCol("
> col2").setOutputCol("indexed_col2")
> val col3_indexer = new StringIndexer().setInputCol("
> col3").setOutputCol("indexed_col3")
> val col4_indexer = new StringIndexer().setInputCol("
> col4").setOutputCol("indexed_col4")
> val col5_indexer = new StringIndexer().setInputCol("
> col5").setOutputCol("indexed_col5")
>
> val featureArray =  Array("indexed_col1","indexed_
> col2","indexed_col3","indexed_col4","indexed_col5")
> val vectorAssembler = new VectorAssembler().setInputCols(featureArray).
> setOutputCol("*feature*")
> val featureVectorIndexer = new VectorIndexer()
> .setInputCol("feature")
> .setOutputCol("indexedfeature")
> .setMaxCategories(180)
>
> val decisionTree = new DecisionTreeClassifier().
> setMaxBins(300).setMaxDepth(1).setImpurity("entropy").
> setLabelCol("indexed_user_action").setFeaturesCol("indexedfeature").
> setPredictionCol("prediction")
>
> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,
> featureVectorIndexer,decisionTree))
> val model = pipeline.*fit(trainingSet)*
> val output = model.transform(cvSet)
>
>
> Thanks,
> Baahu
>
> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Hi Rich,
>>
>> I looked at the notebook and it seems like you are fitting the
>> StringIndexer and VectorIndexer to only the training data, and it should
>> the the entire data set.  So if the training data does not include all of
>> the labels and an unknown label appears in the test data during evaluation,
>> then it will not know how to index it.  So your code should be like this,
>> fit with 'digits' instead of 'training'
>>
>> val labelIndexer = new StringIndexer().setInputCol("l
>> abel").setOutputCol("indexedLabel").fit(digits)
>> // Automatically identify categorical features, and index them.
>> // Set maxCategories so features with > 4 distinct values are treated as
>> continuous.
>> val featureIndexer = new VectorIndexer().setInputCol("f
>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)
>>
>> Hope that helps!
>>
>> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <richta...@gmail.com> wrote:
>>
>>> Hi Bryan.
>>>
>>> Thanks for your continued help.
>>>
>>> Here is the code shown in a Jupyter notebook. I figured this was easier
>>> that cutting and pasting the code into an email. If you  would like me to
>>> send you the code in a different format let, me know. The necessary data is
>>> all downloaded within the notebook itself.
>>>
>>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-
>>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2
>>> d94a794506bb282729dab8f05118fafe5f11886326e02fc
>>>
>>> A few additional pieces of information.
>>>
>>> 1. The training dataset is cached before training the model. 

Re: Grid Search using Spark MLLib Pipelines

2016-08-12 Thread Bryan Cutler
You will need to cast bestModel to include the MLWritable trait.  The class
Model does not mix it in by default.  For instance:

cvModel.bestModel.asInstanceOf[MLWritable].save("/my/path")

Alternatively, you could save the CV model directly, which takes care of
this

cvModel.save("/my/path")

On Fri, Aug 12, 2016 at 9:17 AM, Adamantios Corais <
adamantios.cor...@gmail.com> wrote:

> Hi,
>
> Assuming that I have run the following pipeline and have got the best
> logistic regression model. How can I then save that model for later use?
> The following command throws an error:
>
> cvModel.bestModel.save("/my/path")
>
> Also, is it possible to get the error (a collection of) for each
> combination of parameters?
>
> I am using spark 1.6.2
>
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.classification.LogisticRegression
> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
> import org.apache.spark.ml.tuning.{ParamGridBuilder , CrossValidator}
>
> val lr = new LogisticRegression()
>
> val pipeline = new Pipeline().
> setStages(Array(lr))
>
> val paramGrid = new ParamGridBuilder().
> addGrid(lr.elasticNetParam , Array(0.1)).
> addGrid(lr.maxIter , Array(10)).
> addGrid(lr.regParam , Array(0.1)).
> build()
>
> val cv = new CrossValidator().
> setEstimator(pipeline).
> setEvaluator(new BinaryClassificationEvaluator).
> setEstimatorParamMaps(paramGrid).
> setNumFolds(2)
>
> val cvModel = cv.
> fit(training)
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Why training data in Kmeans Spark streaming clustering

2016-08-11 Thread Bryan Cutler
The algorithm update is just broken into 2 steps: trainOn - to learn/update
the cluster centers, and predictOn - predicts cluster assignment on data

The StreamingKMeansExample you reference breaks up data into training and
test because you might want to score the predictions.  If you don't care
about that, you could just use a single stream for both steps.

On Thu, Aug 11, 2016 at 9:14 AM, Ahmed Sadek  wrote:

> Dear All,
>
> I was wondering why there is training data and testing data in kmeans ?
> Shouldn't it be unsupervised learning with just access to stream data ?
>
> I found similar question but couldn't understand the answer.
> http://stackoverflow.com/questions/30972057/is-the-
> streaming-k-means-clustering-predefined-in-mllib-library-of-spark-supervi
>
> Thanks!
> Ahmed
>


Re: Spark 2.0 - JavaAFTSurvivalRegressionExample doesn't work

2016-07-28 Thread Bryan Cutler
That's the correct fix.  I have this done along with a few other Java
examples that still use the old MLlib Vectors in this PR thats waiting for
review https://github.com/apache/spark/pull/14308

On Jul 28, 2016 5:14 AM, "Robert Goodman"  wrote:

> I changed import in the sample from
>
> import org.apache.spark.mllib.linalg.*;
>
> to
>
>import org.apache.spark.ml.linalg.*;
>
> and the sample now runs.
>
>Thanks
>  Bob
>
>
> On Wed, Jul 27, 2016 at 1:33 PM, Robert Goodman  wrote:
> > I tried to run the JavaAFTSurvivalRegressionExample on Spark 2.0 and the
> > example doesn't work. It looks like the problem is that the example is
> using
> > the MLLib Vector/VectorUDT to create the DataSet which needs to be
> converted
> > using MLUtils before using in the model. I haven't actually tried this
> yet.
> >
> > When I run the example (/bin/run-example
> > ml.JavaAFTSurvivalRegressionExample), I get the following stack trace
> >
> > Exception in thread "main" java.lang.IllegalArgumentException:
> requirement
> > failed: Column features must be of type
> > org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually
> > org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.
> > at scala.Predef$.require(Predef.scala:224)
> > at
> >
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
> > at
> >
> org.apache.spark.ml.regression.AFTSurvivalRegressionParams$class.validateAndTransformSchema(AFTSurvivalRegression.scala:106)
> > at
> >
> org.apache.spark.ml.regression.AFTSurvivalRegression.validateAndTransformSchema(AFTSurvivalRegression.scala:126)
> > at
> >
> org.apache.spark.ml.regression.AFTSurvivalRegression.fit(AFTSurvivalRegression.scala:199)
> > at
> >
> org.apache.spark.examples.ml.JavaAFTSurvivalRegressionExample.main(JavaAFTSurvivalRegressionExample.java:67)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
> > at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
> > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
> > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
> > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >
> >
> > Are you suppose to be able use the ML version of VectorUDT? The Spark 2.0
> > API docs for Java, don't show the class but I was able to import the
> class
> > into a java program.
> >
> >Thanks
> >  Bob
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Programmatic use of UDFs from Java

2016-07-22 Thread Bryan Cutler
Everett, I had the same question today and came across this old thread.
Not sure if there has been any more recent work to support this.
http://apache-spark-developers-list.1001551.n3.nabble.com/Using-UDFs-in-Java-without-registration-td12497.html


On Thu, Jul 21, 2016 at 10:10 AM, Everett Anderson  wrote:

> Hi,
>
> In the Java Spark DataFrames API, you can create a UDF, register it, and
> then access it by string name by using the convenience UDF classes in
> org.apache.spark.sql.api.java
> 
> .
>
> Example
>
> UDF1 testUdf1 = new UDF1<>() { ... }
>
> sqlContext.udf().register("testfn", testUdf1, DataTypes.LongType);
>
> DataFrame df2 = df.withColumn("new_col", *functions.callUDF("testfn"*,
> df.col("old_col")));
>
> However, I'd like to avoid registering these by name, if possible, since I
> have many of them and would need to deal with name conflicts.
>
> There are udf() methods like this that seem to be from the Scala API
> ,
> where you don't have to register everything by name first.
>
> However, using those methods from Java would require interacting with
> Scala's scala.reflect.api.TypeTags.TypeTag. I'm having a hard time
> figuring out how to create a TypeTag from Java.
>
> Does anyone have an example of using the udf() methods from Java?
>
> Thanks!
>
> - Everett
>
>


Re: MLlib, Java, and DataFrame

2016-07-21 Thread Bryan Cutler
ML has a DataFrame based API, while MLlib is RDDs and will be deprecated as
of Spark 2.0.

On Thu, Jul 21, 2016 at 10:41 PM, VG <vlin...@gmail.com> wrote:

> Why do we have these 2 packages ... ml and mlib?
> What is the difference in these
>
>
>
> On Fri, Jul 22, 2016 at 11:09 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Hi JG,
>>
>> If you didn't know this, Spark MLlib has 2 APIs, one of which uses
>> DataFrames.  Take a look at this example
>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java
>>
>> This example uses a Dataset, which is type equivalent to a DataFrame.
>>
>>
>> On Thu, Jul 21, 2016 at 8:41 PM, Jean Georges Perrin <j...@jgp.net> wrote:
>>
>>> Hi,
>>>
>>> I am looking for some really super basic examples of MLlib (like a
>>> linear regression over a list of values) in Java. I have found a few, but I
>>> only saw them using JavaRDD... and not DataFrame.
>>>
>>> I was kind of hoping to take my current DataFrame and send them in
>>> MLlib. Am I too optimistic? Do you know/have any example like that?
>>>
>>> Thanks!
>>>
>>> jg
>>>
>>>
>>> Jean Georges Perrin
>>> j...@jgp.net / @jgperrin
>>>
>>>
>>>
>>>
>>>
>>
>


Re: MLlib, Java, and DataFrame

2016-07-21 Thread Bryan Cutler
Hi JG,

If you didn't know this, Spark MLlib has 2 APIs, one of which uses
DataFrames.  Take a look at this example
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java

This example uses a Dataset, which is type equivalent to a DataFrame.


On Thu, Jul 21, 2016 at 8:41 PM, Jean Georges Perrin  wrote:

> Hi,
>
> I am looking for some really super basic examples of MLlib (like a linear
> regression over a list of values) in Java. I have found a few, but I only
> saw them using JavaRDD... and not DataFrame.
>
> I was kind of hoping to take my current DataFrame and send them in MLlib.
> Am I too optimistic? Do you know/have any example like that?
>
> Thanks!
>
> jg
>
>
> Jean Georges Perrin
> j...@jgp.net / @jgperrin
>
>
>
>
>


Re: spark-submit local and Akka startup timeouts

2016-07-19 Thread Bryan Cutler
The patch I was referring to doesn't help on the ActorSystem startup
unfortunately.  As best I can tell the property
"akka.remote.startup-timeout" is what controls this timeout.  You can try
setting this to something greater in your Spark conf and hopefully that
would work.  Otherwise you might have luck trying a more recent version of
Spark, such as 1.6.2 or even 2.0.0 (soon to be released) which no longer
uses Akka and the ActorSystem.  Hope that helps!

On Tue, Jul 19, 2016 at 2:29 AM, Rory Waite <rwa...@sdl.com> wrote:

> Sorry Bryan, I should have mentioned that I'm running 1.6.0 for hadoop2.6.
> The binaries were downloaded from the Spark website.
>
>
> We're free to upgrade to Spark, create custom builds, etc. Please let me
> know how to display the config property.
>
>   <http://www.sdl.com/>
> www.sdl.com
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
> --
> *From:* Bryan Cutler <cutl...@gmail.com>
> *Sent:* 19 July 2016 02:20:38
> *To:* Rory Waite
> *Cc:* user
> *Subject:* Re: spark-submit local and Akka startup timeouts
>
>
> Hi Rory, for starters what version of Spark are you using?  I believe that
> in a 1.5.? release (I don't know which one off the top of my head) there
> was an addition that would also display the config property when a timeout
> happened.  That might help some if you are able to upgrade.
>
> On Jul 18, 2016 9:34 AM, "Rory Waite" <rwa...@sdl.com> wrote:
>
>> Hi All,
>>
>> We have created a regression test for a spark job that is executed during
>> our automated build. It executes a spark-submit with a local master,
>> processes some data, and the exits. We have an issue in that we get a
>> non-deterministic timeout error. It seems to be when the spark context
>> tries to initialise Akka (stack trace below). It doesn't happen often, but
>> when it does it causes the whole build to fail.
>>
>> The machines that run these tests get very heavily loaded, with many
>> regression tests running simultaneously. My theory is that the spark-submit
>> is sometimes unable to initialise Akka in time because the machines are so
>> heavily loaded with the other tests. My first thought was to try to tune
>> some parameter to extend the timeout, but I couldn't find anything in the
>> documentation. The timeout is short at 10s, whereas the default akka
>> timeout is set at 100s.
>>
>> Is there a way to adjust this timeout?
>>
>> 16/07/17 00:04:22 ERROR SparkContext: Error initializing SparkContext.
>> java.util.concurrent.TimeoutException: Futures timed out after [1
>> milliseconds]
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> at scala.concurrent.Await$.result(package.scala:107)
>> at akka.remote.Remoting.start(Remoting.scala:179)
>> at
>> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>> at
>> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
>> at
>> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
>> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
>> at org.apache.spark.SparkEnv$.cre

Re: spark-submit local and Akka startup timeouts

2016-07-18 Thread Bryan Cutler
Hi Rory, for starters what version of Spark are you using?  I believe that
in a 1.5.? release (I don't know which one off the top of my head) there
was an addition that would also display the config property when a timeout
happened.  That might help some if you are able to upgrade.

On Jul 18, 2016 9:34 AM, "Rory Waite"  wrote:

> Hi All,
>
> We have created a regression test for a spark job that is executed during
> our automated build. It executes a spark-submit with a local master,
> processes some data, and the exits. We have an issue in that we get a
> non-deterministic timeout error. It seems to be when the spark context
> tries to initialise Akka (stack trace below). It doesn't happen often, but
> when it does it causes the whole build to fail.
>
> The machines that run these tests get very heavily loaded, with many
> regression tests running simultaneously. My theory is that the spark-submit
> is sometimes unable to initialise Akka in time because the machines are so
> heavily loaded with the other tests. My first thought was to try to tune
> some parameter to extend the timeout, but I couldn't find anything in the
> documentation. The timeout is short at 10s, whereas the default akka
> timeout is set at 100s.
>
> Is there a way to adjust this timeout?
>
> 16/07/17 00:04:22 ERROR SparkContext: Error initializing SparkContext.
> java.util.concurrent.TimeoutException: Futures timed out after [1
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at akka.remote.Remoting.start(Remoting.scala:179)
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
> at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
> at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
> at org.apache.spark.SparkContext.(SparkContext.scala:457)
> at com.sdl.nntrainer.NNTrainer$.main(NNTrainer.scala:418)
> at com.sdl.nntrainer.NNTrainer.main(NNTrainer.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 16/07/17 00:04:22 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
> down remote daemon.
> 16/07/17 00:04:22 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" java.util.concurrent.TimeoutException: Futures
> timed out after [1 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at akka.remote.Remoting.start(Remoting.scala:179)
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
> at 

Re: Random Forest Classification

2016-07-08 Thread Bryan Cutler
Hi Rich,

I looked at the notebook and it seems like you are fitting the
StringIndexer and VectorIndexer to only the training data, and it should
the the entire data set.  So if the training data does not include all of
the labels and an unknown label appears in the test data during evaluation,
then it will not know how to index it.  So your code should be like this,
fit with 'digits' instead of 'training'

val labelIndexer = new
StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(digits)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as
continuous.
val featureIndexer = new
VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)

Hope that helps!

On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <richta...@gmail.com> wrote:

> Hi Bryan.
>
> Thanks for your continued help.
>
> Here is the code shown in a Jupyter notebook. I figured this was easier
> that cutting and pasting the code into an email. If you  would like me to
> send you the code in a different format let, me know. The necessary data is
> all downloaded within the notebook itself.
>
>
> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d94a794506bb282729dab8f05118fafe5f11886326e02fc
>
> A few additional pieces of information.
>
> 1. The training dataset is cached before training the model. If you do not
> cache the training dataset, the model will not train. The code
> model.transform(test) fails with a similar error. No other changes besides
> caching or not caching. Again, with the training dataset cached, the model
> can be successfully trained as seen in the notebook.
>
> 2. I have another version of the notebook where I download the same data
> in libsvm format rather than csv. That notebook works fine. All the code is
> essentially the same accounting for the difference in file formats.
>
> 3. I tested this same code on another Spark cloud platform and it displays
> the same symptoms when run there.
>
> Thanks.
> Rich
>
>
> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Are you fitting the VectorIndexer to the entire data set and not just
>> training or test data?  If you are able to post your code and some data to
>> reproduce, that would help in troubleshooting.
>>
>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <richta...@gmail.com> wrote:
>>
>>> Thanks for the response, but in my case I reversed the meaning of
>>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>>> way, but in retrospect, it probably only causes confusion to anyone else
>>> looking at this. I reran the code with all the pipeline stage inputs and
>>> outputs named exactly as in the Random Forest Classifier example to make
>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>
>>> I'm still at the point where I can train the model and make predictions,
>>> but not able to get the MulticlassClassificationEvaluator to work on
>>> the DataFrame of predictions.
>>>
>>> Any other suggestions? Thanks.
>>>
>>>
>>>
>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <richta...@gmail.com> wrote:
>>>
>>>> I created a ML pipeline using the Random Forest Classifier - similar to
>>>> what is described here except in my case the source data is in csv format
>>>> rather than libsvm.
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>>>>
>>>> I am able to successfully train the model and make predictions (on test
>>>> data not used to train the model) as shown here.
>>>>
>>>> ++--+-+--++
>>>> |indexedLabel|predictedLabel|label|prediction|features|
>>>> ++--+-+--++
>>>> | 4.0|   4.0|0| 0|(784,[124,125,126...|
>>>> | 2.0|   2.0|3| 3|(784,[119,120,121...|
>>>> | 8.0|   8.0|8| 8|(784,[180,181,182...|
>>>> | 0.0|   0.0|1| 1|(784,[154,155,156...|
>>>> | 3.0|   8.0|2| 8|(784,[148,149,150...|
>>>> ++--+-+--++
>>>> only showin

Re: ClassNotFoundException: org.apache.parquet.hadoop.ParquetOutputCommitter

2016-07-07 Thread Bryan Cutler
Can you try running the example like this

./bin/run-example sql.RDDRelation 

I know there are some jars in the example folders, and running them this
way adds them to the classpath
On Jul 7, 2016 3:47 AM, "kevin"  wrote:

> hi,all:
> I build spark use:
>
> ./make-distribution.sh --name "hadoop2.7.1" --tgz
> "-Pyarn,hadoop-2.6,parquet-provided,hive,hive-thriftserver" -DskipTests
> -Dhadoop.version=2.7.1
>
> I can run example :
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
> --master spark://master1:7077 \
> --driver-memory 1g \
> --executor-memory 512m \
> --executor-cores 1 \
> lib/spark-examples*.jar \
> 10
>
> but can't run example :
> org.apache.spark.examples.sql.RDDRelation
>
> *I got error:*
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/2 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/4 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/3 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/0 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/1 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/5 is now RUNNING
> 16/07/07 18:28:46 INFO cluster.SparkDeploySchedulerBackend:
> SchedulerBackend is ready for scheduling beginning after reached
> minRegisteredResourcesRatio: 0.0
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/parquet/hadoop/ParquetOutputCommitter
> at org.apache.spark.sql.SQLConf$.(SQLConf.scala:319)
> at org.apache.spark.sql.SQLConf$.(SQLConf.scala)
> at org.apache.spark.sql.SQLContext.(SQLContext.scala:85)
> at org.apache.spark.sql.SQLContext.(SQLContext.scala:77)
> at main.RDDRelation$.main(RDDRelation.scala:13)
> at main.RDDRelation.main(RDDRelation.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.parquet.hadoop.ParquetOutputCommitter
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 15 more
>
>


Re: Set the node the spark driver will be started

2016-06-29 Thread Bryan Cutler
Hi Felix,

I think the problem you are describing has been fixed in later versions,
check out this JIRA https://issues.apache.org/jira/browse/SPARK-13803


On Wed, Jun 29, 2016 at 9:27 AM, Mich Talebzadeh 
wrote:

> Fine. in standalone mode spark uses its own scheduling as opposed to Yarn
> or anything else.
>
> As a matter of interest can you start spark-submit from any node in the
> cluster? Are these all have the same or similar CPU and RAM?
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 June 2016 at 10:54, Felix Massem 
> wrote:
>
>> In addition we are not using Yarn we are using the standalone mode and
>> the driver will be started with the deploy-mode cluster
>>
>> Thx Felix
>> Felix Massem | IT-Consultant | Karlsruhe
>> mobil: +49 (0) 172.2919848
>>
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet.
>>
>> Am 29.06.2016 um 11:13 schrieb Felix Massem > >:
>>
>> Hey Mich,
>>
>> the distribution is like not given. Just right now I have 15 applications
>> and all 15 drivers are running on one node. This is just after giving all
>> machines a little more memory.
>> Before I had like 15 applications and about 13 driver where running on
>> one machine. While trying to submit a new job I got OOM exceptions which
>> took down my cassandra service only to start the driver on the same node
>> where  all the other 13 drivers where running.
>>
>> Thx and best regards
>> Felix
>>
>>
>> Felix Massem | IT-Consultant | Karlsruhe
>> mobil: +49 (0) 172.2919848
>>
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet.
>>
>> Am 28.06.2016 um 17:55 schrieb Mich Talebzadeh > >:
>>
>> Hi Felix,
>>
>> In Yarn-cluster mode the resource manager Yarn is expected to take care
>> of that.
>>
>> Are you getting some skewed distribution with drivers created through
>> spark-submit on different nodes?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 28 June 2016 at 16:06, Felix Massem 
>> wrote:
>>
>>> Hey Mich,
>>>
>>> thx for the fast reply.
>>>
>>> We are using it in cluster mode and spark version 1.5.2
>>>
>>> Greets Felix
>>>
>>>
>>> Felix Massem | IT-Consultant | Karlsruhe
>>> mobil: +49 (0) 172.2919848
>>>
>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>> www.more4fi.de
>>>

Re: Random Forest Classification

2016-06-28 Thread Bryan Cutler
Are you fitting the VectorIndexer to the entire data set and not just
training or test data?  If you are able to post your code and some data to
reproduce, that would help in troubleshooting.

On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro  wrote:

> Thanks for the response, but in my case I reversed the meaning of
> "prediction" and "predictedLabel". It seemed to make more sense to me that
> way, but in retrospect, it probably only causes confusion to anyone else
> looking at this. I reran the code with all the pipeline stage inputs and
> outputs named exactly as in the Random Forest Classifier example to make
> sure I hadn't messed anything up when I renamed things. Same error.
>
> I'm still at the point where I can train the model and make predictions,
> but not able to get the MulticlassClassificationEvaluator to work on the
> DataFrame of predictions.
>
> Any other suggestions? Thanks.
>
>
>
> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro  wrote:
>
>> I created a ML pipeline using the Random Forest Classifier - similar to
>> what is described here except in my case the source data is in csv format
>> rather than libsvm.
>>
>>
>> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>>
>> I am able to successfully train the model and make predictions (on test
>> data not used to train the model) as shown here.
>>
>> ++--+-+--++
>> |indexedLabel|predictedLabel|label|prediction|features|
>> ++--+-+--++
>> | 4.0|   4.0|0| 0|(784,[124,125,126...|
>> | 2.0|   2.0|3| 3|(784,[119,120,121...|
>> | 8.0|   8.0|8| 8|(784,[180,181,182...|
>> | 0.0|   0.0|1| 1|(784,[154,155,156...|
>> | 3.0|   8.0|2| 8|(784,[148,149,150...|
>> ++--+-+--++
>> only showing top 5 rows
>>
>> However, when I attempt to calculate the error between the indexedLabel and 
>> the precictedLabel using the MulticlassClassificationEvaluator, I get the 
>> NoSuchElementException error attached below.
>>
>> val evaluator = new 
>> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>> val accuracy = evaluator.evaluate(predictions)
>> println("Test Error = " + (1.0 - accuracy))
>>
>> What could be the issue?
>>
>>
>>
>> Name: org.apache.spark.SparkException
>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 
>> times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, 
>> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 
>> 132.0
>>  at scala.collection.MapLike$class.default(MapLike.scala:228)
>>  at scala.collection.AbstractMap.default(Map.scala:58)
>>  at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>  at scala.collection.AbstractMap.apply(Map.scala:58)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>  at 
>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>  at 
>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>  at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at 
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>  at 
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>  at 

Re: Random Forest Classification

2016-06-28 Thread Bryan Cutler
The problem might be that you are evaluating with "predictionLabel" instead
of "prediction", where predictionLabel is the prediction index mapped to
the original label strings - at least according to the
RandomForestClassifierExample, not sure if your code is exactly the same.

On Tue, Jun 28, 2016 at 1:21 PM, Rich Tarro  wrote:

> I created a ML pipeline using the Random Forest Classifier - similar to
> what is described here except in my case the source data is in csv format
> rather than libsvm.
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>
> I am able to successfully train the model and make predictions (on test
> data not used to train the model) as shown here.
>
> ++--+-+--++
> |indexedLabel|predictedLabel|label|prediction|features|
> ++--+-+--++
> | 4.0|   4.0|0| 0|(784,[124,125,126...|
> | 2.0|   2.0|3| 3|(784,[119,120,121...|
> | 8.0|   8.0|8| 8|(784,[180,181,182...|
> | 0.0|   0.0|1| 1|(784,[154,155,156...|
> | 3.0|   8.0|2| 8|(784,[148,149,150...|
> ++--+-+--++
> only showing top 5 rows
>
> However, when I attempt to calculate the error between the indexedLabel and 
> the precictedLabel using the MulticlassClassificationEvaluator, I get the 
> NoSuchElementException error attached below.
>
> val evaluator = new 
> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
> val accuracy = evaluator.evaluate(predictions)
> println("Test Error = " + (1.0 - accuracy))
>
> What could be the issue?
>
>
>
> Name: org.apache.spark.SparkException
> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 
> times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, 
> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 
> 132.0
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.MapLike$class.apply(MapLike.scala:141)
>   at scala.collection.AbstractMap.apply(Map.scala:58)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>   at 
> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>   at 
> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>   at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.lang.Thread.run(Thread.java:785)
>
> Driver stacktrace:
> StackTrace: 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> 

Re: LogisticRegression.scala ERROR, require(Predef.scala)

2016-06-23 Thread Bryan Cutler
The stack trace you provided seems to hint that you are calling "predict"
on an RDD with Vectors that are not the same size as the number of features
in your trained model, they should be equal.  If that's not the issue, it
would be easier to troubleshoot if you could share your code and possibly
some test data.

On Thu, Jun 23, 2016 at 4:30 AM, Ascot Moss  wrote:

> Hi,
>
> My Spark is 1.5.2, when trying MLLib, I got the following error. Any idea
> to fix it?
>
> Regards
>
>
> ==
>
> 16/06/23 16:26:20 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID
> 5)
>
> java.lang.IllegalArgumentException: requirement failed
>
> at scala.Predef$.require(Predef.scala:221)
>
> at
> org.apache.spark.mllib.classification.LogisticRegressionModel.predictPoint(LogisticRegression.scala:118)
>
> at
> org.apache.spark.mllib.regression.GeneralizedLinearModel$$anonfun$predict$1$$anonfun$apply$1.apply(GeneralizedLinearAlgorithm.scala:65)
>
> at
> org.apache.spark.mllib.regression.GeneralizedLinearModel$$anonfun$predict$1$$anonfun$apply$1.apply(GeneralizedLinearAlgorithm.scala:65)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.next(RDD.scala:815)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.next(RDD.scala:808)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
>
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 16/06/23 16:26:20 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
> localhost): java.lang.IllegalArgumentException: requirement failed
>
> at scala.Predef$.require(Predef.scala:221)
>
> at
> org.apache.spark.mllib.classification.LogisticRegressionModel.predictPoint(LogisticRegression.scala:118)
>
> at
> org.apache.spark.mllib.regression.GeneralizedLinearModel$$anonfun$predict$1$$anonfun$apply$1.apply(GeneralizedLinearAlgorithm.scala:65)
>
> at
> org.apache.spark.mllib.regression.GeneralizedLinearModel$$anonfun$predict$1$$anonfun$apply$1.apply(GeneralizedLinearAlgorithm.scala:65)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.next(RDD.scala:815)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.next(RDD.scala:808)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
>
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
> 16/06/23 16:26:20 ERROR TaskSetManager: Task 0 in stage 5.0 failed 1
> times; aborting job
>
> 16/06/23 

Re: Specify node where driver should run

2016-06-06 Thread Bryan Cutler
I'm not an expert on YARN so anyone please correct me if I'm wrong, but I
believe the Resource Manager will schedule the application to be run on the
AM of any node that has a Node Manager, depending on available resources.
So you would normally query the RM via the REST API to determine that.  You
can restrict which nodes get scheduled using this propery
spark.yarn.am.nodeLabelExpression.
See here for details
http://spark.apache.org/docs/latest/running-on-yarn.html

On Mon, Jun 6, 2016 at 9:04 AM, Saiph Kappa <saiph.ka...@gmail.com> wrote:

> How can I specify the node where application master should run in the yarn
> conf? I haven't found any useful information regarding that.
>
> Thanks.
>
> On Mon, Jun 6, 2016 at 4:52 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> In that mode, it will run on the application master, whichever node that
>> is as specified in your yarn conf.
>> On Jun 5, 2016 4:54 PM, "Saiph Kappa" <saiph.ka...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In yarn-cluster mode, is there any way to specify on which node I want
>>> the driver to run?
>>>
>>> Thanks.
>>>
>>
>


Re: Specify node where driver should run

2016-06-06 Thread Bryan Cutler
In that mode, it will run on the application master, whichever node that is
as specified in your yarn conf.
On Jun 5, 2016 4:54 PM, "Saiph Kappa"  wrote:

> Hi,
>
> In yarn-cluster mode, is there any way to specify on which node I want the
> driver to run?
>
> Thanks.
>


Re: Multinomial regression with spark.ml version of LogisticRegression

2016-05-29 Thread Bryan Cutler
This is currently being worked on, planned for 2.1 I believe
https://issues.apache.org/jira/browse/SPARK-7159
On May 28, 2016 9:31 PM, "Stephen Boesch"  wrote:

> Thanks Phuong But the point of my post is how to achieve without using
>  the deprecated the mllib pacakge. The mllib package already has
>  multinomial regression built in
>
> 2016-05-28 21:19 GMT-07:00 Phuong LE-HONG :
>
>> Dear Stephen,
>>
>> Yes, you're right, LogisticGradient is in the mllib package, not ml
>> package. I just want to say that we can build a multinomial logistic
>> regression model from the current version of Spark.
>>
>> Regards,
>>
>> Phuong
>>
>>
>>
>> On Sun, May 29, 2016 at 12:04 AM, Stephen Boesch 
>> wrote:
>> > Hi Phuong,
>> >The LogisticGradient exists in the mllib but not ml package. The
>> > LogisticRegression chooses either the breeze LBFGS - if L2 only (not
>> elastic
>> > net) and no regularization or the Orthant Wise Quasi Newton (OWLQN)
>> > otherwise: it does not appear to choose GD in either scenario.
>> >
>> > If I have misunderstood your response please do clarify.
>> >
>> > thanks stephenb
>> >
>> > 2016-05-28 20:55 GMT-07:00 Phuong LE-HONG :
>> >>
>> >> Dear Stephen,
>> >>
>> >> The Logistic Regression currently supports only binary regression.
>> >> However, the LogisticGradient does support computing gradient and loss
>> >> for a multinomial logistic regression. That is, you can train a
>> >> multinomial logistic regression model with LogisticGradient and a
>> >> class to solve optimization like LBFGS to get a weight vector of the
>> >> size (numClassrd-1)*numFeatures.
>> >>
>> >>
>> >> Phuong
>> >>
>> >>
>> >> On Sat, May 28, 2016 at 12:25 PM, Stephen Boesch 
>> >> wrote:
>> >> > Followup: just encountered the "OneVsRest" classifier in
>> >> > ml.classsification: I will look into using it with the binary
>> >> > LogisticRegression as the provided classifier.
>> >> >
>> >> > 2016-05-28 9:06 GMT-07:00 Stephen Boesch :
>> >> >>
>> >> >>
>> >> >> Presently only the mllib version has the one-vs-all approach for
>> >> >> multinomial support.  The ml version with ElasticNet support only
>> >> >> allows
>> >> >> binary regression.
>> >> >>
>> >> >> With feature parity of ml vs mllib having been stated as an
>> objective
>> >> >> for
>> >> >> 2.0.0 -  is there a projected availability of the  multinomial
>> >> >> regression in
>> >> >> the ml package?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> `
>> >> >
>> >> >
>> >
>> >
>>
>
>


Re: Get output of the ALS algorithm.

2016-03-15 Thread Bryan Cutler
Jacek is correct for using org.apache.spark.ml.recommendation.ALSModel

If you are trying to save
org.apache.spark.mllib.recommendation.MatrixFactorizationModel, then it is
similar, but just a little different, see the example here
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/RecommendationExample.scala#L62

On Fri, Mar 11, 2016 at 8:18 PM, Shishir Anshuman <shishiranshu...@gmail.com
> wrote:

> The model produced after training.
>
> On Fri, Mar 11, 2016 at 10:29 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Are you trying to save predictions on a dataset to a file, or the model
>> produced after training with ALS?
>>
>> On Thu, Mar 10, 2016 at 7:57 PM, Shishir Anshuman <
>> shishiranshu...@gmail.com> wrote:
>>
>>> hello,
>>>
>>> I am new to Apache Spark and would like to get the Recommendation output
>>> of the ALS algorithm in a file.
>>> Please suggest me the solution.
>>>
>>> Thank you
>>>
>>>
>>>
>>
>


Re: Get output of the ALS algorithm.

2016-03-11 Thread Bryan Cutler
Are you trying to save predictions on a dataset to a file, or the model
produced after training with ALS?

On Thu, Mar 10, 2016 at 7:57 PM, Shishir Anshuman  wrote:

> hello,
>
> I am new to Apache Spark and would like to get the Recommendation output
> of the ALS algorithm in a file.
> Please suggest me the solution.
>
> Thank you
>
>
>


Re: LDA topic Modeling spark + python

2016-02-29 Thread Bryan Cutler
The input into LDA.train needs to be an RDD of a list with the first
element an integer (id) and the second a pyspark.mllib.Vector object
containing real numbers (term counts), i.e. an RDD of [doc_id,
vector_of_counts].

>From your example, it looks like your corpus is a list with an zero-based
id, with the second element a tuple of user id and list of lines from the
data that have that user_id, something like [doc_id, (user_id, [line0,
line1])]

You need to make that element a Vector containing real numbers somehow.

On Sun, Feb 28, 2016 at 11:08 PM, Mishra, Abhishek <
abhishek.mis...@xerox.com> wrote:

> Hello Bryan,
>
>
>
> Thank you for the update on Jira. I took your code and tried with mine.
> But I get an error with the vector being created. Please see my code below
> and suggest me.
>
> My input file has some contents like this:
>
> "user_id","status"
>
> "0026c10bbbc7eeb55a61ab696ca93923","http:
> www.youtube.com//watch?v=n3nPiBai66M=related **bobsnewline**
> tiftakar, Trudy Darmanin  <3?"
>
> "0026c10bbbc7eeb55a61ab696ca93923","Brandon Cachia ,All I know is
> that,you're so nice."
>
> "0026c10bbbc7eeb55a61ab696ca93923","Melissa Zejtunija:HAM AND CHEESE BIEX
> INI??? **bobsnewline**  Kirr:bit tigieg mel **bobsnewline**  Melissa
> Zejtunija :jaq le mandix aptit tigieg **bobsnewline**  Kirr:int bis
> serjeta?"
>
> "0026c10bbbc7eeb55a61ab696ca93923",".Where is my mind?"
>
>
>
> And what I am doing in my code is like this:
>
>
>
> import string
>
> from pyspark.sql import SQLContext
>
> from pyspark import SparkConf, SparkContext
>
> from pyspark.sql import SQLContext
>
> from pyspark.mllib.clustering import LDA, LDAModel
>
> from nltk.tokenize import word_tokenize
>
> from stop_words import get_stop_words
>
> from nltk.stem.porter import PorterStemmer
>
> from gensim import corpora, models
>
> import gensim
>
> import textmining
>
> import pandas as pd
>
> conf = SparkConf().setAppName("building a warehouse")
>
> sc = SparkContext(conf=conf)
>
> sql_sc = SQLContext(sc)
>
> data = sc.textFile('file:///home/cloudera/LDA-Model/Pyspark/test1.csv')
>
> header = data.first() #extract header
>
> print header
>
> data = data.filter(lambda x:x !=header)#filter out header
>
> pairs = data.map(lambda x: (x.split(',')[0], x))#.collect()#generate pair
> rdd key value
>
> #data11=data.subtractByKey(header)
>
> #print pairs.collect()
>
> #grouped=pairs.map(lambda (x,y): (x, [y])).reduceByKey(lambda a, b: a + b)
>
> grouped=pairs.groupByKey()#grouping values as per key
>
> #print grouped.collectAsMap()
>
> grouped_val= grouped.map(lambda x : (list(x[1]))).collect()
>
> #rr=grouped_val.map(lambda (x,y):(x,[y]))
>
> #df_grouped_val=sql_sc.createDataFrame(rr, ["user_id", "status"])
>
> #print list(enumerate(grouped_val))
>
> #corpus = grouped.zipWithIndex().map(lambda x: [x[1],
> x[0]]).cache()#.collect()
>
> corpus = grouped.zipWithIndex().map(lambda (term_counts, doc_id): [doc_id,
> term_counts]).cache()
>
> #corpus.cache()
>
> model = LDA.train(corpus, k=10, maxIterations=10, optimizer="online")
>
> #ldaModel = LDA.train(corpus, k=3)
>
> print corpus
>
> topics = model.describeTopics(3)
>
> print("\"topic\", \"termIndices\", \"termWeights\"")
>
> for i, t in enumerate(topics):
>
>print("%d, %s, %s" % (i, str(t[0]), str(t[1])))
>
>
>
> sc.stop()
>
>
>
>
>
> Please help me in this
>
> Abhishek
>
>
>
> *From:* Bryan Cutler [mailto:cutl...@gmail.com]
> *Sent:* Friday, February 26, 2016 4:17 AM
> *To:* Mishra, Abhishek
> *Cc:* user@spark.apache.org
> *Subject:* Re: LDA topic Modeling spark + python
>
>
>
> I'm not exactly sure how you would like to setup your LDA model, but I
> noticed there was no Python example for LDA in Spark.  I created this issue
> to add it https://issues.apache.org/jira/browse/SPARK-13500.  Keep an eye
> on this if it could be of help.
>
> bryan
>
>
>
> On Wed, Feb 24, 2016 at 8:34 PM, Mishra, Abhishek <
> abhishek.mis...@xerox.com> wrote:
>
> Hello All,
>
>
>
> If someone has any leads on this please help me.
>
>
>
> Sincerely,
>
> Abhishek
>
>
>
> *From:* Mishra, Abhishek
> *Sent:* Wednesday, February 24, 2016 5:11 PM
> *To:* user@spark.apache.org
> *Subject:* LDA topic Modeling spark + python
>
>
>
> Hello All,
>
>
>
>
>
> I am doing a LDA model, please guide me with something.
>
>
>
> I have a csv file which has two column "user_id" and "status". I have to
> generate a word-topic distribution after aggregating the user_id. Meaning
> to say I need to model it for users on their grouped status. The topic
> length being 2000 and value of k or number of words being 3.
>
>
>
> Please, if you can provide me with some link or some code base on spark
> with python ; I would be grateful.
>
>
>
>
>
> Looking forward for a  reply,
>
>
>
> Sincerely,
>
> Abhishek
>
>
>
>
>


Re: LDA topic Modeling spark + python

2016-02-25 Thread Bryan Cutler
I'm not exactly sure how you would like to setup your LDA model, but I
noticed there was no Python example for LDA in Spark.  I created this issue
to add it https://issues.apache.org/jira/browse/SPARK-13500.  Keep an eye
on this if it could be of help.

bryan

On Wed, Feb 24, 2016 at 8:34 PM, Mishra, Abhishek  wrote:

> Hello All,
>
>
>
> If someone has any leads on this please help me.
>
>
>
> Sincerely,
>
> Abhishek
>
>
>
> *From:* Mishra, Abhishek
> *Sent:* Wednesday, February 24, 2016 5:11 PM
> *To:* user@spark.apache.org
> *Subject:* LDA topic Modeling spark + python
>
>
>
> Hello All,
>
>
>
>
>
> I am doing a LDA model, please guide me with something.
>
>
>
> I have a csv file which has two column "user_id" and "status". I have to
> generate a word-topic distribution after aggregating the user_id. Meaning
> to say I need to model it for users on their grouped status. The topic
> length being 2000 and value of k or number of words being 3.
>
>
>
> Please, if you can provide me with some link or some code base on spark
> with python ; I would be grateful.
>
>
>
>
>
> Looking forward for a  reply,
>
>
>
> Sincerely,
>
> Abhishek
>
>
>


Re: Spark Streaming - processing/transforming DStreams using a custom Receiver

2016-02-25 Thread Bryan Cutler
Using flatmap on a string will treat it as a sequence, which is why you are
getting an RDD of char.  I think you want to just do a map instead.  Like
this

val timestamps = stream.map(event => event.getCreatedAt.toString)
On Feb 25, 2016 8:27 AM, "Dominik Safaric"  wrote:

> Recently, I've implemented the following Receiver and custom Spark
> Streaming
> InputDStream using Scala:
>
> /**
>  * The GitHubUtils object declares an interface consisting of overloaded
> createStream
>  * functions. The createStream function takes as arguments the ctx :
> StreamingContext
>  * passed by the driver program, along with the storageLevel :
> StorageLevel,
> returning
>  * a GitHubInputDStream. Whereas the GitHubInputDStream is a DStream
> representation,
>  * i.e. a derivation of the abstract class ReceivedInputDStream.
> */
>
> object GitHubUtils{
>
>   def createStream(ctx : StreamingContext, storageLevel: StorageLevel) :
> ReceiverInputDStream[Event] = new GitHubInputDStream(ctx,storageLevel)
>
> }
>
> /**
>  * The GitHubInputDStream class takes as constructor arguments a ctx :
> StreamingContext,
>  * and a storageLevel : StorageLevel. The class inherits from the
> ReceiverInputDStream
>  * abstract class declared within SparkStreaming. In summary, the
> GitHubInputDStream
>  * is a DStream representation of GitHub events, implementing i.e.
> overriding the
>  * getReceiver() function that returns a Receiver[Event] object.
> */
>
> private[streaming]
> class GitHubInputDStream(ctx : StreamingContext, storageLevel:
> StorageLevel)
> extends ReceiverInputDStream[Event](ctx) {
>
>  def getReceiver() : Receiver[Event] = new GitHubReceiver(storageLevel,
> Client)
>
> }
>
> /**
>  * The GitHubReceiver class takes as a constructor argument a storageLevel
> :
> StorageLevel.
>  * It implements i.e. overrides two functions declared by the Receiver
> interface, notably
>  * onStart() and onStop(). As the names imply, the onStart() function is
> executed
>  * when creating DStreams, i.e. within a specified batch interval. However,
> the onStart().
> */
>
> private[streaming]
> class GitHubReceiver(storageLevel: StorageLevel, client : GitHubClient)
> extends Receiver[Event](storageLevel) with Logging {
>
>   def onStart(): Unit = {
> consumeEvents(new EventService(client).pagePublicEvents(0,300))
> }
>
>  def consumeEvents(iterator: PageIterator[Event]) :Unit = iterator.hasNext
> match{
> case true => iterator.next.toList.foreach{event => store(event)};
> consumeEvents(iterator)
> case false => logInfo("Processing is stopping")
> }
>
> def onStop(): Unit = {
>
> }
>
> However, then initialised i.e. created in the driver program on my local
> machine and applied a series of functions like e.g. flatMap on a
> DStream[Event]:
>
> val configuration = new
> SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
> val streamingContext = new StreamingContext(configuration, Seconds(5))
>
> val stream = GitHubUtils.createStream(streamingContext,
> StorageLevel.MEMORY_AND_DISK_SER)
>
> val timestamps = stream.flatMap(event => event.getCreatedAt.toString)
>
> and then applied a series of functions such as reduceByKey that would allow
> me to count e.g. the number of events per second, I get the following
> output:
>
> (T,100)
> (d,100)
> (4,100)
> (8,13)
> (6,114)
> (0,366)
>
> While the output should be in the form of e.g.:
>
> (2016-26-02 00:00:01,100)
> (2016-26-02 00:00:02,100)
> (2016-26-02 00:00:03,100)
> (2016-26-02 00:00:04,13)
> (2016-26-02 00:00:05,114)
> (2016-26-02 00:00:06,366)
>
> where K = Char. The root of the problem is that when flatMap is applied to
> an event that is a serialisable object containing a member variable
> getCreatedAt : Date, rather then producing a DStream[String] it produces a
> DStream[Char] - meaning that Spark somehow splits the date String using
> some
> delimiter.
>
> I've also tried to collect and perform the computation on timestamps using
> first foreachRDD on the DStream of events, and then using collect to get
> the
> full String representation of the date - and then it works. However, since
> collect can be quite expensive, I am simply trying to avoid it and hence
> think that there must be a better solution to this.
>
> Therefore, my questions are: how exactly do a create from the
> DStream[Event]
> a DStream[String] (instead of DStream[Char]), where each string in the
> DStream represents a timestamp from a RDD? Secondly, can someone give some
> good examples of this? And thirdly, which functions is at best to use if I
> would like to e.g. aggregate all events per repository ID. I.e. each Event
> object contains a getRepository() function that returns the ID : Long of
> the
> GitHub repository, and then on each streamed event belonging to a certain
> repository, I would like to map it to its corresponding repository ID in
> the
> form of (Long, [Event]).
>
> Thanks in advance!
>
>
>
> --
> View this message in context:
> 

Re: StreamingKMeans does not update cluster centroid locations

2016-02-19 Thread Bryan Cutler
This simple example works for me, it prints out the updated model centers.
I'm running from the master branch.

  val sc = new SparkContext("local[2]", "test")
val ssc = new StreamingContext(sc, Seconds(1))

val kMeans = new StreamingKMeans()
.setK(2)
.setDecayFactor(0.0)
.setInitialCenters(Array(Vectors.dense(0.0), Vectors.dense(1.0)),
Array(1.0, 1.0))

val rddQueue = new SynchronizedQueue[RDD[Vector]]()

val data1 = sc.parallelize(Array(
  Vectors.dense(-0.5),
  Vectors.dense(0.6),
  Vectors.dense(0.8)
))

val data2 = sc.parallelize(Array(
  Vectors.dense(0.2),
  Vectors.dense(-0.1),
  Vectors.dense(0.3)
))

rddQueue += data1
rddQueue += data2

val inputStream = ssc.queueStream(rddQueue)

kMeans.trainOn(inputStream)

val predictStream = kMeans.predictOn(inputStream)

def collect(rdd: RDD[Int]): Unit = {
  val rdd_collect = rdd.collect()
  println(s"predict_results: ${rdd_collect.mkString(",")}")
  kMeans.latestModel.clusterCenters.foreach(println)
}

predictStream.foreachRDD(collect _)

ssc.start()
ssc.awaitTermination()

On Fri, Feb 19, 2016 at 1:15 PM, krishna ramachandran <ram...@s1776.com>
wrote:

> Also the cluster centroid I get in streaming mode (some with negative
> values) do not make sense - if I use the same data and run in batch
>
> KMeans.train(sc.parallelize(parsedData), numClusters, numIterations)
>
> cluster centers are what you would expect.
>
> Krishna
>
>
>
> On Fri, Feb 19, 2016 at 12:49 PM, krishna ramachandran <ram...@s1776.com>
> wrote:
>
>> ok i will share a simple example soon. meantime you will be able to see
>> this behavior using example here,
>>
>>
>> https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala
>>
>> slightly modify it to include
>>
>> model.latestModel.clusterCenters.foreach(println)
>>
>> (after model.trainOn)
>>
>> add new files to trainingDir periodically
>>
>> I have 3 dimensions per data-point - they look like these,
>>
>> [1, 1, 385.224145278]
>>
>> [3, 1, 384.752946389]
>>
>> [4,1, 3083.2778025]
>>
>> [2, 4, 6226.40232139]
>>
>> [1, 2, 785.84266]
>>
>> [5, 1, 6706.05424139]
>>
>> 
>>
>> and monitor. please let know if I missed something
>>
>> Krishna
>>
>>
>>
>>
>>
>> On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>>
>>> Can you share more of your code to reproduce this issue?  The model
>>> should be updated with each batch, but can't tell what is happening from
>>> what you posted so far.
>>>
>>> On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran <ram...@s1776.com
>>> > wrote:
>>>
>>>> Hi Bryan
>>>> Agreed. It is a single statement to print the centers once for *every*
>>>> streaming batch (4 secs) - remember this is in streaming mode and the
>>>> receiver has fresh data every batch. That is, as the model is trained
>>>> continuously so I expect the centroids to change with incoming streams (at
>>>> least until convergence)
>>>>
>>>> But am seeing same centers always for the entire duration - ran the app
>>>> for several hours with a custom receiver.
>>>>
>>>> Yes I am using the latestModel to predict using "labeled" test data.
>>>> But also like to know where my centers are
>>>>
>>>> regards
>>>> Krishna
>>>>
>>>>
>>>>
>>>> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler <cutl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Could you elaborate where the issue is?  You say calling
>>>>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>>>>> model, but that is just a single statement to print the centers once..
>>>>>
>>>>> Also, is there any reason you don't predict on the test data like this?
>>>>>
>>>>> model.predictOnValues(testData.map(lp => (lp.label,
>>>>> lp.features))).print()
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <ram...@s1776.com> wrote:
>>>>>
>>>>>> I have streaming application wherein I train the model using a
>>>>>> re

Re: StreamingKMeans does not update cluster centroid locations

2016-02-19 Thread Bryan Cutler
Can you share more of your code to reproduce this issue?  The model should
be updated with each batch, but can't tell what is happening from what you
posted so far.

On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran <ram...@s1776.com>
wrote:

> Hi Bryan
> Agreed. It is a single statement to print the centers once for *every*
> streaming batch (4 secs) - remember this is in streaming mode and the
> receiver has fresh data every batch. That is, as the model is trained
> continuously so I expect the centroids to change with incoming streams (at
> least until convergence)
>
> But am seeing same centers always for the entire duration - ran the app
> for several hours with a custom receiver.
>
> Yes I am using the latestModel to predict using "labeled" test data. But
> also like to know where my centers are
>
> regards
> Krishna
>
>
>
> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Could you elaborate where the issue is?  You say calling
>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>> model, but that is just a single statement to print the centers once..
>>
>> Also, is there any reason you don't predict on the test data like this?
>>
>> model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
>>
>>
>>
>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <ram...@s1776.com> wrote:
>>
>>> I have streaming application wherein I train the model using a receiver
>>> input
>>> stream in 4 sec batches
>>>
>>> val stream = ssc.receiverStream(receiver) //receiver gets new data every
>>> batch
>>> model.trainOn(stream.map(Vectors.parse))
>>> If I use
>>> model.latestModel.clusterCenters.foreach(println)
>>>
>>> the value of cluster centers remain unchanged from the very initial
>>> value it
>>> got during first iteration (when the streaming app started)
>>>
>>> when I use the model to predict cluster assignment with a labeled input
>>> the
>>> assignments change over time as expected
>>>
>>>   testData.transform {rdd =>
>>> rdd.map(lp => (lp.label,
>>> model.latestModel().predict(lp.features)))
>>>   }.print()
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: StreamingKMeans does not update cluster centroid locations

2016-02-19 Thread Bryan Cutler
Could you elaborate where the issue is?  You say calling
model.latestModel.clusterCenters.foreach(println) doesn't show an updated
model, but that is just a single statement to print the centers once..

Also, is there any reason you don't predict on the test data like this?

model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()



On Thu, Feb 18, 2016 at 5:59 PM, ramach1776  wrote:

> I have streaming application wherein I train the model using a receiver
> input
> stream in 4 sec batches
>
> val stream = ssc.receiverStream(receiver) //receiver gets new data every
> batch
> model.trainOn(stream.map(Vectors.parse))
> If I use
> model.latestModel.clusterCenters.foreach(println)
>
> the value of cluster centers remain unchanged from the very initial value
> it
> got during first iteration (when the streaming app started)
>
> when I use the model to predict cluster assignment with a labeled input the
> assignments change over time as expected
>
>   testData.transform {rdd =>
> rdd.map(lp => (lp.label, model.latestModel().predict(lp.features)))
>   }.print()
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkContext SyntaxError: invalid syntax

2016-01-15 Thread Bryan Cutler
Glad you got it going!  It's wasn't very obvious what needed to be set,
maybe it is worth explicitly stating this in the docs since it seems to
have come up a couple times before too.

Bryan

On Fri, Jan 15, 2016 at 12:33 PM, Andrew Weiner <
andrewweiner2...@u.northwestern.edu> wrote:

> Actually, I just found this [
> https://issues.apache.org/jira/browse/SPARK-1680], which after a bit of
> googling and reading leads me to believe that the preferred way to change
> the yarn environment is to edit the spark-defaults.conf file by adding this
> line:
> spark.yarn.appMasterEnv.PYSPARK_PYTHON/path/to/python
>
> While both this solution and the solution from my prior email work, I
> believe this is the preferred solution.
>
> Sorry for the flurry of emails.  Again, thanks for all the help!
>
> Andrew
>
> On Fri, Jan 15, 2016 at 1:47 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> I finally got the pi.py example to run in yarn cluster mode.  This was
>> the key insight:
>> https://issues.apache.org/jira/browse/SPARK-9229
>>
>> I had to set SPARK_YARN_USER_ENV in spark-env.sh:
>> export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/home/aqualab/local/bin/python"
>>
>> This caused the PYSPARK_PYTHON environment variable to be used in my yarn
>> environment in cluster mode.
>>
>> Thank you for all your help!
>>
>> Best,
>> Andrew
>>
>>
>>
>> On Fri, Jan 15, 2016 at 12:57 PM, Andrew Weiner <
>> andrewweiner2...@u.northwestern.edu> wrote:
>>
>>> I tried playing around with my environment variables, and here is an
>>> update.
>>>
>>> When I run in cluster mode, my environment variables do not persist
>>> throughout the entire job.
>>> For example, I tried creating a local copy of HADOOP_CONF_DIR in
>>> /home//local/etc/hadoop/conf, and then, in spark-env.sh I the
>>> variable:
>>> export HADOOP_CONF_DIR=/home//local/etc/hadoop/conf
>>>
>>> Later, when we print the environment variables in the python code, I see
>>> this:
>>>
>>> ('HADOOP_CONF_DIR', '/etc/hadoop/conf')
>>>
>>> However, when I run in client mode, I see this:
>>>
>>> ('HADOOP_CONF_DIR', '/home/awp066/local/etc/hadoop/conf')
>>>
>>> Furthermore, if I omit that environment variable from spark-env.sh 
>>> altogether, I get the expected error in both client and cluster mode:
>>>
>>> When running with master 'yarn'
>>> either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
>>>
>>> This suggests that my environment variables are being used when I first 
>>> submit the job, but at some point during the job, my environment variables 
>>> are thrown out and someone's (yarn's?) environment variables are being used.
>>>
>>> Andrew
>>>
>>>
>>> On Fri, Jan 15, 2016 at 11:03 AM, Andrew Weiner <
>>> andrewweiner2...@u.northwestern.edu> wrote:
>>>
>>>> Indeed!  Here is the output when I run in cluster mode:
>>>>
>>>> Traceback (most recent call last):
>>>>   File "pi.py", line 22, in ?
>>>> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
>>>> RuntimeError:
>>>> (2, 4, 3, 'final', 0)
>>>> [('PYSPARK_GATEWAY_PORT', '48079'), ('PYTHONPATH', 
>>>> '/scratch2/hadoop/yarn/local/usercache//filecache/116/spark-assembly-1.6.0-hadoop2.4.0.jar:/home//spark-1.6.0-bin-hadoop2.4/python:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/py4j-0.9-src.zip'),
>>>>  ('PYTHONUNBUFFERED', 'YES')]
>>>>
>>>> As we suspected, it is using Python 2.4
>>>>
>>>> One thing that surprises me is that PYSPARK_PYTHON is not showing up in 
>>>> the list, even though I am setting it and exporting it in spark-submit 
>>>> *and* in spark-env.sh.  Is there somewhere else I need to set this 
>>>> variable?  Maybe in one of the hadoop conf files in my HADOOP_CONF_DIR?
>>>>
>>>> Andrew
>>>>
>>>>
>>>>
>>>> On Thu, Jan 14, 2016 at 1:14 PM, Bryan Cutler <cutl...@gmail.com>
>>>> wrote:
>>>>
>>>>> It seems like it could be the case that some other Python version is
>>>>> being invok

Re: Random Forest FeatureImportance throwing NullPointerException

2016-01-14 Thread Bryan Cutler
If you are able to just train the RandomForestClassificationModel from ML
directly instead of training the old model and converting, then that would
be the way to go.

On Thu, Jan 14, 2016 at 2:21 PM, <rachana.srivast...@thomsonreuters.com>
wrote:

> Thanks so much Bryan for your response.  Is there any workaround?
>
>
>
> *From:* Bryan Cutler [mailto:cutl...@gmail.com]
> *Sent:* Thursday, January 14, 2016 2:19 PM
> *To:* Rachana Srivastava
> *Cc:* user@spark.apache.org; d...@spark.apache.org
> *Subject:* Re: Random Forest FeatureImportance throwing
> NullPointerException
>
>
>
> Hi Rachana,
>
> I got the same exception.  It is because computing the feature importance
> depends on impurity stats, which is not calculated with the old
> RandomForestModel in MLlib.  Feel free to create a JIRA for this if you
> think it is necessary, otherwise I believe this problem will be eventually
> solved as part of this JIRA
> https://issues.apache.org/jira/browse/SPARK-12183
>
> Bryan
>
>
>
> On Thu, Jan 14, 2016 at 8:12 AM, Rachana Srivastava <
> rachana.srivast...@markmonitor.com> wrote:
>
> Tried using 1.6 version of Spark that takes numberOfFeatures fifth
> argument in  the API but still getting featureImportance as null.
>
>
>
> RandomForestClassifier rfc = *getRandomForestClassifier*( numTrees,
> maxBinSize,  maxTreeDepth,  seed,  impurity);
>
> RandomForestClassificationModel rfm = RandomForestClassificationModel.
> *fromOld*(model, rfc, categoricalFeatures, numberOfClasses,
> numberOfFeatures);
>
> System.*out*.println(rfm.featureImportances());
>
>
>
> Stack Trace:
>
> Exception in thread "main" *java.lang.NullPointerException*
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$.computeFeatureImportance(RandomForest.scala:1152)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:1108)
>
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$.featureImportances(RandomForest.scala:1108)
>
> at
> org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances$lzycompute(RandomForestClassifier.scala:237)
>
> at
> org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances(RandomForestClassifier.scala:237)
>
> at
> com.markmonitor.antifraud.ce.ml.CheckFeatureImportance.main(
> *CheckFeatureImportance.java:49*)
>
>
>
> *From:* Rachana Srivastava
> *Sent:* Wednesday, January 13, 2016 3:30 PM
> *To:* 'user@spark.apache.org'; 'd...@spark.apache.org'
> *Subject:* Random Forest FeatureImportance throwing NullPointerException
>
>
>
> I have a Random forest model for which I am trying to get the
> featureImportance vector.
>
>
>
> Map<Object,Object> categoricalFeaturesParam = *new* HashMap<>();
>
> scala.collection.immutable.Map<Object,Object> categoricalFeatures =
>  (scala.collection.immutable.Map<Object,Object>)
>
> scala.collection.immutable.Map$.*MODULE$*.apply(JavaConversions.
> *mapAsScalaMap*(categoricalFeaturesParam).toSeq());
>
> *int* numberOfClasses =2;
>
> RandomForestClassifier rfc = *new* RandomForestClassifier();
>
> RandomForestClassificationModel rfm = RandomForestClassificationModel.
> *fromOld*(model, rfc, categoricalFeatures, numberOfClasses);
>
> System.*out*.println(rfm.featureImportances());
>
>
>
> When I run above code I found featureImportance as null.  Do I need to set
> anything in specific to get the feature importance for the random forest
> model.
>
>
>
> Thanks,
>
>
>
> Rachana
>
>
>


Re: Random Forest FeatureImportance throwing NullPointerException

2016-01-14 Thread Bryan Cutler
Hi Rachana,

I got the same exception.  It is because computing the feature importance
depends on impurity stats, which is not calculated with the old
RandomForestModel in MLlib.  Feel free to create a JIRA for this if you
think it is necessary, otherwise I believe this problem will be eventually
solved as part of this JIRA
https://issues.apache.org/jira/browse/SPARK-12183

Bryan

On Thu, Jan 14, 2016 at 8:12 AM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Tried using 1.6 version of Spark that takes numberOfFeatures fifth
> argument in  the API but still getting featureImportance as null.
>
>
>
> RandomForestClassifier rfc = *getRandomForestClassifier*( numTrees,
> maxBinSize,  maxTreeDepth,  seed,  impurity);
>
> RandomForestClassificationModel rfm = RandomForestClassificationModel.
> *fromOld*(model, rfc, categoricalFeatures, numberOfClasses,
> numberOfFeatures);
>
> System.*out*.println(rfm.featureImportances());
>
>
>
> Stack Trace:
>
> Exception in thread "main" *java.lang.NullPointerException*
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$.computeFeatureImportance(RandomForest.scala:1152)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:1108)
>
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$.featureImportances(RandomForest.scala:1108)
>
> at
> org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances$lzycompute(RandomForestClassifier.scala:237)
>
> at
> org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances(RandomForestClassifier.scala:237)
>
> at
> com.markmonitor.antifraud.ce.ml.CheckFeatureImportance.main(
> *CheckFeatureImportance.java:49*)
>
>
>
> *From:* Rachana Srivastava
> *Sent:* Wednesday, January 13, 2016 3:30 PM
> *To:* 'user@spark.apache.org'; 'd...@spark.apache.org'
> *Subject:* Random Forest FeatureImportance throwing NullPointerException
>
>
>
> I have a Random forest model for which I am trying to get the
> featureImportance vector.
>
>
>
> Map categoricalFeaturesParam = *new* HashMap<>();
>
> scala.collection.immutable.Map categoricalFeatures =
>  (scala.collection.immutable.Map)
>
> scala.collection.immutable.Map$.*MODULE$*.apply(JavaConversions.
> *mapAsScalaMap*(categoricalFeaturesParam).toSeq());
>
> *int* numberOfClasses =2;
>
> RandomForestClassifier rfc = *new* RandomForestClassifier();
>
> RandomForestClassificationModel rfm = RandomForestClassificationModel.
> *fromOld*(model, rfc, categoricalFeatures, numberOfClasses);
>
> System.*out*.println(rfm.featureImportances());
>
>
>
> When I run above code I found featureImportance as null.  Do I need to set
> anything in specific to get the feature importance for the random forest
> model.
>
>
>
> Thanks,
>
>
>
> Rachana
>


Re: SparkContext SyntaxError: invalid syntax

2016-01-13 Thread Bryan Cutler
Hi Andrew,

There are a couple of things to check.  First, is Python 2.7 the default
version on all nodes in the cluster or is it an alternate install? Meaning
what is the output of this command "$>  python --version"  If it is an
alternate install, you could set the environment variable "PYSPARK_PYTHON"
Python binary executable to use for PySpark in both driver and workers
(default is python).

Did you try to submit the Python example under client mode?  Otherwise, the
command looks fine, you don't use the --class option for submitting python
files
* ./bin/spark-submit  --master yarn --deploy-mode client
--driver-memory 4g --executor-memory 2g --executor-cores 1
./examples/src/main/python/pi.py 10*

That is a good sign that local jobs and Java examples work, probably just a
small configuration issue :)

Bryan

On Wed, Jan 13, 2016 at 3:51 PM, Andrew Weiner <
andrewweiner2...@u.northwestern.edu> wrote:

> Thanks for your continuing help.  Here is some additional info.
>
> *OS/architecture*
> output of *cat /proc/version*:
> Linux version 2.6.18-400.1.1.el5 (mockbu...@x86-012.build.bos.redhat.com)
>
> output of *lsb_release -a*:
> LSB Version:
>  
> :core-4.0-amd64:core-4.0-ia32:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-ia32:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-ia32:printing-4.0-noarch
> Distributor ID: RedHatEnterpriseServer
> Description:Red Hat Enterprise Linux Server release 5.11 (Tikanga)
> Release:5.11
> Codename:   Tikanga
>
> *Running a local job*
> I have confirmed that I can successfully run python jobs using
> bin/spark-submit --master local[*]
> Specifically, this is the command I am using:
> *./bin/spark-submit --master local[8]
> ./examples/src/main/python/wordcount.py
> file:/home//spark-1.6.0-bin-hadoop2.4/README.md*
> And it works!
>
> *Additional info*
> I am also able to successfully run the Java SparkPi example using yarn in
> cluster mode using this command:
> * ./bin/spark-submit --class org.apache.spark.examples.SparkPi
> --master yarn --deploy-mode cluster --driver-memory 4g
> --executor-memory 2g --executor-cores 1 lib/spark-examples*.jar
> 10*
> This Java job also runs successfully when I change --deploy-mode to
> client.  The fact that I can run Java jobs in cluster mode makes me thing
> that everything is installed correctly--is that a valid assumption?
>
> The problem remains that I cannot submit python jobs.  Here is the command
> that I am using to try to submit python jobs:
> * ./bin/spark-submit  --master yarn --deploy-mode cluster
> --driver-memory 4g --executor-memory 2g --executor-cores 1
> ./examples/src/main/python/pi.py 10*
> Does that look like a correct command?  I wasn't sure what to put for
> --class so I omitted it.  At any rate, the result of the above command is a
> syntax error, similar to the one I posted in the original email:
>
> Traceback (most recent call last):
>   File "pi.py", line 24, in ?
> from pyspark import SparkContext
>   File 
> "/home//spark-1.6.0-bin-hadoop2.4/python/pyspark/__init__.py", line 
> 61
> indent = ' ' * (min(len(m) for m in indents) if indents else 0)
>   ^
> SyntaxError: invalid syntax
>
>
> This really looks to me like a problem with the python version.  Python
> 2.4 would throw this syntax error but Python 2.7 would not.  And yet I am
> using Python 2.7.8.  Is there any chance that Spark or Yarn is somehow
> using an older version of Python without my knowledge?
>
> Finally, when I try to run the same command in client mode...
> * ./bin/spark-submit  --master yarn --deploy-mode client
> --driver-memory 4g --executor-memory 2g --executor-cores 1
> ./examples/src/main/python/pi.py 10*
> I get the error I mentioned in the prior email:
> Error from python worker:
>   python: module pyspark.daemon not found
>
> Any thoughts?
>
> Best,
> Andrew
>
>
> On Mon, Jan 11, 2016 at 12:25 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> This could be an environment issue, could you give more details about the
>> OS/architecture that you are using?  If you are sure everything is
>> installed correctly on each node following the guide on "Running Spark on
>> Yarn" http://spark.apache.org/docs/latest/running-on-yarn.html and that
>> the spark assembly jar is reachable, then I would check to see if you can
>> submit a local job to just run on one node.
>>
>> On Fri, Jan 8, 2016 at 5:22 PM, Andrew Weiner <
>> andrewweiner2...@u.northwestern.edu> wrote:
>>
>>> Now for simplicity I'm testing with wordcount.py from the provid

Re: SparkContext SyntaxError: invalid syntax

2016-01-08 Thread Bryan Cutler
Hi Andrew,

I know that older versions of Spark could not run PySpark on YARN in
cluster mode.  I'm not sure if that is fixed in 1.6.0 though.  Can you try
setting deploy-mode option to "client" when calling spark-submit?

Bryan

On Thu, Jan 7, 2016 at 2:39 PM, weineran <
andrewweiner2...@u.northwestern.edu> wrote:

> Hello,
>
> When I try to submit a python job using spark-submit (using --master yarn
> --deploy-mode cluster), I get the following error:
>
> /Traceback (most recent call last):
>   File "loss_rate_by_probe.py", line 15, in ?
> from pyspark import SparkContext
>   File
>
> "/scratch5/hadoop/yarn/local/usercache//filecache/18/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/__init__.py",
> line 41, in ?
>   File
>
> "/scratch5/hadoop/yarn/local/usercache//filecache/18/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/context.py",
> line 219
> with SparkContext._lock:
> ^
> SyntaxError: invalid syntax/
>
> This is very similar to  this post from 2014
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-lock-Error-td18233.html
> >
> , but unlike that person I am using Python 2.7.8.
>
> Here is what I'm using:
> Spark 1.3.1
> Hadoop 2.4.0.2.1.5.0-695
> Python 2.7.8
>
> Another clue:  I also installed Spark 1.6.0 and tried to submit the same
> job.  I got a similar error:
>
> /Traceback (most recent call last):
>   File "loss_rate_by_probe.py", line 15, in ?
> from pyspark import SparkContext
>   File
>
> "/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0119/container_1450370639491_0119_01_01/pyspark.zip/pyspark/__init__.py",
> line 61
> indent = ' ' * (min(len(m) for m in indents) if indents else 0)
>   ^
> SyntaxError: invalid syntax/
>
> Any thoughts?
>
> Andrew
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-SyntaxError-invalid-syntax-tp25910.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: error writing to stdout

2016-01-06 Thread Bryan Cutler
This is a known issue https://issues.apache.org/jira/browse/SPARK-9844.  As
Noorul said, it is probably safe to ignore as the executor process is
already destroyed at this point.

On Mon, Dec 21, 2015 at 8:54 PM, Noorul Islam K M  wrote:

> carlilek  writes:
>
> > My users use Spark 1.5.1 in standalone mode on an HPC cluster, with a
> > smattering still using 1.4.0
> >
> > I have been getting reports of errors like this:
> >
> > 15/12/21 15:40:33 ERROR FileAppender: Error writing stream to file
> > /scratch/spark/work/app-20151221150645-/3/stdout
> > java.io.IOException: Stream closed
> >   at
> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
> >   at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
> >   at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
> >   at java.io.FilterInputStream.read(FilterInputStream.java:107)
> >   at
> >
> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
> >   at
> >
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
> >   at
> >
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
> >   at
> >
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
> >   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> >   at
> >
> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
> > '
> >
> > So far I have been unable to reproduce reliably, but does anyone have any
> > ideas?
> >
>
> I have seen this happening in our cluster also. So far I have been
> ignoring this.
>
> Thanks and Regards
> Noorul
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: looking for a easier way to count the number of items in a JavaDStream

2015-12-16 Thread Bryan Cutler
To follow up with your other issue, if you are just trying to count
elements in a DStream, you can do that without an Accumulator.  foreachRDD
is meant to be an output action, it does not return anything and it is
actually run in the driver program.  Because Java (before 8) handles
closures a little differently, it might be easiest to implement the
function to pass to foreachRDD as something like this:

class MyFunc implements VoidFunction<JavaRDD> {

  public long total = 0;

  @Override
  public void call(JavaRDD rdd) {
System.out.println("foo " + rdd.collect().toString());
total += rdd.count();
  }
}

MyFunc f = new MyFunc();

inputStream.foreachRDD(f);

// f.total will have the count of all RDDs

Hope that helps some!

-bryan

On Wed, Dec 16, 2015 at 8:37 AM, Bryan Cutler <cutl...@gmail.com> wrote:

> Hi Andy,
>
> Regarding the foreachrdd return value, this Jira that will be in 1.6
> should take care of that https://issues.apache.org/jira/browse/SPARK-4557
> and make things a little simpler.
> On Dec 15, 2015 6:55 PM, "Andy Davidson" <a...@santacruzintegration.com>
> wrote:
>
>> I am writing  a JUnit test for some simple streaming code. I want to
>> make assertions about how many things are in a given JavaDStream. I wonder
>> if there is an easier way in Java to get the count?
>>
>> I think there are two points of friction.
>>
>>
>>1. is it easy to create an accumulator of type double or int, How
>>ever Long is not supported
>>2. We need to use javaDStream.foreachRDD. The Function interface must
>>return void. I was not able to define an accumulator in my driver and
>>use a lambda function. (I am new to lambda in Java)
>>
>> Here is a little lambda example that logs my test objects. I was not
>> able to figure out how to get  to return a value or access a accumulator
>>
>>data.foreachRDD(rdd -> {
>>
>> logger.info(“Begin data.foreachRDD" );
>>
>> for (MyPojo pojo : rdd.collect()) {
>>
>> logger.info("\n{}", pojo.toString());
>>
>> }
>>
>> return null;
>>
>> });
>>
>>
>> Any suggestions would be greatly appreciated
>>
>> Andy
>>
>> This following code works in my driver but is a lot of code for such a
>> trivial computation. Because it needs to the JavaSparkContext I do not
>> think it would work inside a closure. I assume the works do not have access
>> to the context as a global and that it shipping it in the closure is not a
>> good idea?
>>
>> public class JavaDStreamCount implements Serializable {
>>
>> private static final long serialVersionUID = -3600586183332429887L;
>>
>> public static Logger logger =
>> LoggerFactory.getLogger(JavaDStreamCount.class);
>>
>>
>>
>> public Double hack(JavaSparkContext sc, JavaDStream javaDStream) {
>>
>> Count c = new Count(sc);
>>
>> javaDStream.foreachRDD(c);
>>
>> return c.getTotal().value();
>>
>> }
>>
>>
>>
>> class Count implements Function<JavaRDD,Void> {
>>
>> private static final long serialVersionUID =
>> -5239727633710162488L;
>>
>> Accumulator total;
>>
>>
>>
>> public Count(JavaSparkContext sc) {
>>
>> total = sc.accumulator(0.0);
>>
>> }
>>
>>
>>
>> @Override
>>
>> public java.lang.Void call(JavaRDD rdd) throws Exception {
>>
>> List data = rdd.collect();
>>
>> int dataSize = data.size();
>>
>> logger.error("data.size:{}", dataSize);
>>
>> long num = rdd.count();
>>
>> logger.error("num:{}", num);
>>
>> total.add(new Double(num));
>>
>> return null;
>>
>> }
>>
>>
>> public Accumulator getTotal() {
>>
>> return total;
>>
>> }
>>
>> }
>>
>> }
>>
>>
>>
>>
>>


Re: looking for a easier way to count the number of items in a JavaDStream

2015-12-16 Thread Bryan Cutler
Hi Andy,

Regarding the foreachrdd return value, this Jira that will be in 1.6 should
take care of that https://issues.apache.org/jira/browse/SPARK-4557 and make
things a little simpler.
On Dec 15, 2015 6:55 PM, "Andy Davidson" 
wrote:

> I am writing  a JUnit test for some simple streaming code. I want to make
> assertions about how many things are in a given JavaDStream. I wonder if there
> is an easier way in Java to get the count?
>
> I think there are two points of friction.
>
>
>1. is it easy to create an accumulator of type double or int, How ever
>Long is not supported
>2. We need to use javaDStream.foreachRDD. The Function interface must
>return void. I was not able to define an accumulator in my driver and
>use a lambda function. (I am new to lambda in Java)
>
> Here is a little lambda example that logs my test objects. I was not able
> to figure out how to get  to return a value or access a accumulator
>
>data.foreachRDD(rdd -> {
>
> logger.info(“Begin data.foreachRDD" );
>
> for (MyPojo pojo : rdd.collect()) {
>
> logger.info("\n{}", pojo.toString());
>
> }
>
> return null;
>
> });
>
>
> Any suggestions would be greatly appreciated
>
> Andy
>
> This following code works in my driver but is a lot of code for such a
> trivial computation. Because it needs to the JavaSparkContext I do not
> think it would work inside a closure. I assume the works do not have access
> to the context as a global and that it shipping it in the closure is not a
> good idea?
>
> public class JavaDStreamCount implements Serializable {
>
> private static final long serialVersionUID = -3600586183332429887L;
>
> public static Logger logger =
> LoggerFactory.getLogger(JavaDStreamCount.class);
>
>
>
> public Double hack(JavaSparkContext sc, JavaDStream javaDStream) {
>
> Count c = new Count(sc);
>
> javaDStream.foreachRDD(c);
>
> return c.getTotal().value();
>
> }
>
>
>
> class Count implements Function {
>
> private static final long serialVersionUID =
> -5239727633710162488L;
>
> Accumulator total;
>
>
>
> public Count(JavaSparkContext sc) {
>
> total = sc.accumulator(0.0);
>
> }
>
>
>
> @Override
>
> public java.lang.Void call(JavaRDD rdd) throws Exception {
>
> List data = rdd.collect();
>
> int dataSize = data.size();
>
> logger.error("data.size:{}", dataSize);
>
> long num = rdd.count();
>
> logger.error("num:{}", num);
>
> total.add(new Double(num));
>
> return null;
>
> }
>
>
> public Accumulator getTotal() {
>
> return total;
>
> }
>
> }
>
> }
>
>
>
>
>


Re: ALS mllib.recommendation vs ml.recommendation

2015-12-15 Thread Bryan Cutler
Hi Roberto,

1. How do they differ in terms of performance?
They both use alternating least squares matrix factorization, the main
difference is ml.recommendation.ALS uses DataFrames as input which has
built-in optimizations and should give better performance

2.  Am I correct to assume ml.recommendation.ALS (unlike mllib) does not
support key-value RDDs? If so, what is the reason?
mllib.recommendation.ALS expects a Ratings RDD type as input, while
ml.recommendation.ALS expects a DataFrame with user, item and ratings
columns.  I'm not sure if that is what you mean about key-value RDDs.

On Mon, Dec 14, 2015 at 3:22 PM, Roberto Pagliari  wrote:

> Currently, there are two implementations of ALS available:
> ml.recommendation.ALS
> 
>  and mllib.recommendation.ALS
> 
>
>
>
>1. How do they differ in terms of performance?
>2. Am I correct to assume ml.recommendation.ALS (unlike mllib) does
>not support key-value RDDs? If so, what is the reason?
>
>
>
> Thank you,
>
>


Re: SparkStreaming variable scope

2015-12-09 Thread Bryan Cutler
rowid from your code is a variable in the driver, so it will be evaluated
once and then only the value is sent to words.map.  You probably want to
have rowid be a lambda itself, so that it will get the value at the time it
is evaluated.  For example if I have the following:

>>> data = sc.parallelize([1,2,3])
>>> from datetime import datetime
>>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S")
>>> data.map(lambda x: (rowid(), x))
>>> mdata = data.map(lambda x: (rowid(), x))
>>> mdata.collect()
[('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)]
>>> mdata.collect()
[('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)]

here rowid is evaluated whenever an action is called on the RDD, i.e.
collect

On Wed, Dec 9, 2015 at 10:23 AM, jpinela  wrote:

> Hi Guys,
> I am sure this is a simple question, but I can't find it in the docs
> anywhere.
> This reads from flume and writes to hbase (as you can see).
> But has a variable scope problem (I believe).
> I have the following code:
>
> *
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.flume import FlumeUtils
> from datetime import datetime
> ssc = StreamingContext(sc, 5)
> conf = {"hbase.zookeeper.quorum": "ubuntu3",
> "hbase.mapred.outputtable": "teste2",
> "mapreduce.outputformat.class":
> "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
> "mapreduce.job.output.key.class":
> "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
> "mapreduce.job.output.value.class":
> "org.apache.hadoop.io.Writable"}
>
>
> keyConv =
>
> "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
> valueConv =
> "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>
> lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997)
> words = lines.map(lambda line: line[1])
> rowid = datetime.now().strftime("%Y%m%d%H%M%S")
> outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x]))
> print("ok 1")
> outrdd.pprint()
>
> outrdd.foreachRDD(lambda x:
>
> x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv))
>
> ssc.start()
> ssc.awaitTermination()*
>
> the issue is that the rowid variable is allways at the point that the
> streaming was began.
> How can I go around this? I tried a function, an application, nothing
> worked.
> Thank you.
> jp
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Working with RDD from Java

2015-11-17 Thread Bryan Cutler
Hi Ivan,

Since Spark 1.4.1 there is a Java-friendly function in LDAModel to get the
topic distributions called javaTopicDistributions() that returns a
JavaPairRDD.  If you aren't able to upgrade, you can check out the
conversion used here
https://github.com/apache/spark/blob/v1.4.1/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala#L350

-bryan

On Tue, Nov 17, 2015 at 3:06 AM, frula00 
wrote:

> Hi,
> I'm working in Java, with Spark 1.3.1 - I am trying to extract data from
> the
> RDD returned by
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicDistributions()
> (return type is RDD>). How do I work with it from
> within Java, I can't seem to cast it to JavaPairRDD nor JavaRDD and if I
> try
> to collect it it simply returns an Object?
>
> Thank you for your help in advance!
>
> Ivan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Working-with-RDD-from-Java-tp25399.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Difference between RandomForestModel and RandomForestClassificationModel

2015-07-30 Thread Bryan Cutler
Hi Praveen,

In MLLib, the major difference is that RandomForestClassificationModel
makes use of a newer API which utilizes ML pipelines.  I can't say for
certain if they will produce the same exact result for a given dataset, but
I believe they should.

Bryan

On Wed, Jul 29, 2015 at 12:14 PM, praveen S mylogi...@gmail.com wrote:

 Hi
 Wanted to know what is the difference between
 RandomForestModel and RandomForestClassificationModel?
 in Mlib.. Will they yield the same results for a given dataset?



Re: Timeout Error

2015-04-26 Thread Bryan Cutler
I'm not sure what the expected performance should be for this amount of
data, but you could try to increase the timeout with the property
spark.akka.timeout to see if that helps.

Bryan

On Sun, Apr 26, 2015 at 6:57 AM, Deepak Gopalakrishnan dgk...@gmail.com
wrote:

 Hello All,

 I'm trying to process a 3.5GB file on standalone mode using spark. I could
 run my spark job succesfully on a 100MB file and it works as expected. But,
 when I try to run it on the 3.5GB file, I run into the below error :


 15/04/26 12:45:50 INFO BlockManagerMaster: Updated info of block taskresult_83
 15/04/26 12:46:46 WARN AkkaUtils: Error sending message [message = 
 Heartbeat(2,[Lscala.Tuple2;@790223d3,BlockManagerId(2, master.spark.com, 
 39143))] in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
   at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
   at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
   at scala.concurrent.Await$.result(package.scala:107)
   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
 15/04/26 12:47:15 INFO MemoryStore: ensureFreeSpace(26227673) called with 
 curMem=265897, maxMem=5556991426
 15/04/26 12:47:15 INFO MemoryStore: Block taskresult_92 stored as bytes in 
 memory (estimated size 25.0 MB, free 5.2 GB)
 15/04/26 12:47:16 INFO MemoryStore: ensureFreeSpace(26272879) called with 
 curMem=26493570, maxMem=5556991426
 15/04/26 12:47:16 INFO MemoryStore: Block taskresult_94 stored as bytes in 
 memory (estimated size 25.1 MB, free 5.1 GB)
 15/04/26 12:47:18 INFO MemoryStore: ensureFreeSpace(26285327) called with 
 curMem=52766449, maxMem=5556991426


 and the job fails.


 I'm on AWS and have opened all ports. Also, since the 100MB file works, it
 should not be a connection issue.  I've a r3 xlarge and 2 m3 large.

 Can anyone suggest a way to fix this?

 --
 Regards,
 *Deepak Gopalakrishnan*
 *Mobile*:+918891509774
 *Skype* : deepakgk87
 http://myexps.blogspot.com