Re: Query around Spark Checkpoints

2020-09-29 Thread Bryan Jeffrey
Jungtaek,

How would you contrast stateful streaming with checkpoint vs. the idea of
writing updates to a Delta Lake table, and then using the Delta Lake table
as a streaming source for our state stream?

Thank you,

Bryan

On Mon, Sep 28, 2020 at 9:50 AM Debabrata Ghosh 
wrote:

> Thank You Jungtaek and Amit ! This is very helpful indeed !
>
> Cheers,
>
> Debu
>
> On Mon, Sep 28, 2020 at 5:33 AM Jungtaek Lim 
> wrote:
>
>>
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
>>
>> You would need to implement CheckpointFileManager by yourself, which is
>> tightly integrated with HDFS (parameters and return types of methods are
>> mostly from HDFS). That wouldn't mean it's impossible to
>> implement CheckpointFileManager against a non-filesystem, but it'd be
>> non-trivial to override all of the functionalities and make it work
>> seamlessly.
>>
>> Required consistency is documented via javadoc of CheckpointFileManager -
>> please go through reading it, and evaluate whether your target storage can
>> fulfill the requirement.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Mon, Sep 28, 2020 at 3:04 AM Amit Joshi 
>> wrote:
>>
>>> Hi,
>>>
>>> As far as I know, it depends on whether you are using spark streaming or
>>> structured streaming.
>>> In spark streaming you can write your own code to checkpoint.
>>> But in case of structured streaming it should be file location.
>>> But main question in why do you want to checkpoint in
>>> Nosql, as it's eventual consistence.
>>>
>>>
>>> Regards
>>> Amit
>>>
>>> On Sunday, September 27, 2020, Debabrata Ghosh 
>>> wrote:
>>>
>>>> Hi,
>>>> I had a query around Spark checkpoints - Can I store the
>>>> checkpoints in NoSQL or Kafka instead of Filesystem ?
>>>>
>>>> Regards,
>>>>
>>>> Debu
>>>>
>>>


Re: Metrics Problem

2020-07-10 Thread Bryan Jeffrey
On Thu, Jul 2, 2020 at 2:33 PM Bryan Jeffrey 
wrote:

> Srinivas,
>
> I finally broke a little bit of time free to look at this issue.  I
> reduced the scope of my ambitions and simply cloned a the ConsoleSink and
> ConsoleReporter class.  After doing so I can see the original version
> works, but the 'modified' version does not work.  The only difference is
> the name & location of the associated JAR.
>
> Looking here:
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Metric-Sink-on-Executor-Always-ClassNotFound-td34205.html#a34206
> "For executors, the jar file of the sink needs to be in the system classpath;
> the application jar is not in the system classpath, so that does not
> work. There are different ways for you to get it there, most of them
> manual (YARN is, I think, the only RM supported in Spark where the
> application itself can do it)."
>
> Looking here:
> https://forums.databricks.com/questions/706/how-can-i-attach-a-jar-library-to-the-cluster-that.html
> "Everything in spark.executor.extraClassPath is on the System classpath.
> These are listed in the Classpath Entries section and marked with Source =
> System Classpath. Everything else is on the application classpath."
>
> From the databricks forum above, it appears that one could pass in a jar
> via '--jars' and then call '--conf spark.executor.extraClassPath ./myJar'.
> However, this does not appear to work; the file is there, but not added in
> the classpath.
>
> Regards,
>
> Bryan Jeffrey
>
>
> On Tue, Jun 30, 2020 at 12:55 PM Srinivas V  wrote:
>
>> Then it should permission issue. What kind of cluster is it and which
>> user is running it ? Does that user have hdfs permissions to access the
>> folder where the jar file is ?
>>
>> On Mon, Jun 29, 2020 at 1:17 AM Bryan Jeffrey 
>> wrote:
>>
>>> Srinivas,
>>>
>>> Interestingly, I did have the metrics jar packaged as part of my main
>>> jar. It worked well both on driver and locally, but not on executors.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>> Get Outlook for Android <https://aka.ms/ghei36>
>>>
>>> --
>>> *From:* Srinivas V 
>>> *Sent:* Saturday, June 27, 2020 1:23:24 AM
>>>
>>> *To:* Bryan Jeffrey 
>>> *Cc:* user 
>>> *Subject:* Re: Metrics Problem
>>>
>>> One option is to create your main jar included with metrics jar like a
>>> fat jar.
>>>
>>> On Sat, Jun 27, 2020 at 8:04 AM Bryan Jeffrey 
>>> wrote:
>>>
>>> Srinivas,
>>>
>>> Thanks for the insight. I had not considered a dependency issue as the
>>> metrics jar works well applied on the driver. Perhaps my main jar
>>> includes the Hadoop dependencies but the metrics jar does not?
>>>
>>> I am confused as the only Hadoop dependency also exists for the built in
>>> metrics providers which appear to work.
>>>
>>> Regards,
>>>
>>> Bryan
>>>
>>> Get Outlook for Android <https://aka.ms/ghei36>
>>>
>>> --
>>> *From:* Srinivas V 
>>> *Sent:* Friday, June 26, 2020 9:47:52 PM
>>> *To:* Bryan Jeffrey 
>>> *Cc:* user 
>>> *Subject:* Re: Metrics Problem
>>>
>>> It should work when you are giving hdfs path as long as your jar exists
>>> in the path.
>>> Your error is more security issue (Kerberos) or Hadoop dependencies
>>> missing I think, your error says :
>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>> UserGroupInformation
>>>
>>> On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
>>> wrote:
>>>
>>> It may be helpful to note that I'm running in Yarn cluster mode.  My
>>> goal is to avoid having to manually distribute the JAR to all of the
>>> various nodes as this makes versioning deployments difficult.
>>>
>>> On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
>>> wrote:
>>>
>>> Hello.
>>>
>>> I am running Spark 2.4.4. I have implemented a custom metrics producer.
>>> It works well when I run locally, or specify the metrics producer only for
>>> the driver.  When I ask for executor metrics I run into
>>> ClassNotFoundExceptions
>>>
>>> *Is it possible to pass a metrics JAR via --jars?  If so what am I
>>> missing?*
>>>
>>> Deploy driver stats via:
>>> --jars hdfs:///custommetricsprov

Re: Metrics Problem

2020-06-28 Thread Bryan Jeffrey
Srinivas,

Interestingly, I did have the metrics jar packaged as part of my main jar. It 
worked well both on driver and locally, but not on executors.

Regards,

Bryan Jeffrey

Get Outlook for Android<https://aka.ms/ghei36>


From: Srinivas V 
Sent: Saturday, June 27, 2020 1:23:24 AM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Metrics Problem

One option is to create your main jar included with metrics jar like a fat jar.

On Sat, Jun 27, 2020 at 8:04 AM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Srinivas,

Thanks for the insight. I had not considered a dependency issue as the metrics 
jar works well applied on the driver. Perhaps my main jar includes the Hadoop 
dependencies but the metrics jar does not?

I am confused as the only Hadoop dependency also exists for the built in 
metrics providers which appear to work.

Regards,

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Srinivas V mailto:srini@gmail.com>>
Sent: Friday, June 26, 2020 9:47:52 PM
To: Bryan Jeffrey mailto:bryan.jeff...@gmail.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Metrics Problem

It should work when you are giving hdfs path as long as your jar exists in the 
path.
Your error is more security issue (Kerberos) or Hadoop dependencies missing I 
think, your error says :
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation

On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
It may be helpful to note that I'm running in Yarn cluster mode.  My goal is to 
avoid having to manually distribute the JAR to all of the various nodes as this 
makes versioning deployments difficult.

On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Hello.

I am running Spark 2.4.4. I have implemented a custom metrics producer. It 
works well when I run locally, or specify the metrics producer only for the 
driver.  When I ask for executor metrics I run into ClassNotFoundExceptions

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Deploy driver stats via:
--jars hdfs:///custommetricsprovider.jar
--conf 
spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink

However, when I pass the JAR with the metrics provider to executors via:
--jars hdfs:///custommetricsprovider.jar
--conf 
spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink

I get ClassNotFoundException:

20/06/25 21:19:35 ERROR MetricsSystem: Sink class 
org.apache.spark.custommetricssink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.custommetricssink
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.se

Re: Metrics Problem

2020-06-26 Thread Bryan Jeffrey
Srinivas,

Thanks for the insight. I had not considered a dependency issue as the metrics 
jar works well applied on the driver. Perhaps my main jar includes the Hadoop 
dependencies but the metrics jar does not?

I am confused as the only Hadoop dependency also exists for the built in 
metrics providers which appear to work.

Regards,

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Srinivas V 
Sent: Friday, June 26, 2020 9:47:52 PM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Metrics Problem

It should work when you are giving hdfs path as long as your jar exists in the 
path.
Your error is more security issue (Kerberos) or Hadoop dependencies missing I 
think, your error says :
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation

On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
It may be helpful to note that I'm running in Yarn cluster mode.  My goal is to 
avoid having to manually distribute the JAR to all of the various nodes as this 
makes versioning deployments difficult.

On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Hello.

I am running Spark 2.4.4. I have implemented a custom metrics producer. It 
works well when I run locally, or specify the metrics producer only for the 
driver.  When I ask for executor metrics I run into ClassNotFoundExceptions

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Deploy driver stats via:
--jars hdfs:///custommetricsprovider.jar
--conf 
spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink

However, when I pass the JAR with the metrics provider to executors via:
--jars hdfs:///custommetricsprovider.jar
--conf 
spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink

I get ClassNotFoundException:

20/06/25 21:19:35 ERROR MetricsSystem: Sink class 
org.apache.spark.custommetricssink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.custommetricssink
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 4 more

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Thank you,

Bryan


Re: Metrics Problem

2020-06-26 Thread Bryan Jeffrey
It may be helpful to note that I'm running in Yarn cluster mode.  My goal
is to avoid having to manually distribute the JAR to all of the various
nodes as this makes versioning deployments difficult.

On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
wrote:

> Hello.
>
> I am running Spark 2.4.4. I have implemented a custom metrics producer. It
> works well when I run locally, or specify the metrics producer only for the
> driver.  When I ask for executor metrics I run into ClassNotFoundExceptions
>
> *Is it possible to pass a metrics JAR via --jars?  If so what am I
> missing?*
>
> Deploy driver stats via:
> --jars hdfs:///custommetricsprovider.jar
> --conf
> spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink
>
> However, when I pass the JAR with the metrics provider to executors via:
> --jars hdfs:///custommetricsprovider.jar
> --conf
> spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink
>
> I get ClassNotFoundException:
>
> 20/06/25 21:19:35 ERROR MetricsSystem: Sink class
> org.apache.spark.custommetricssink cannot be instantiated
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.custommetricssink
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
> at
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
> at
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> at
> org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
> at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
> at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> ... 4 more
>
> Is it possible to pass a metrics JAR via --jars?  If so what am I missing?
>
> Thank you,
>
> Bryan
>


Metrics Problem

2020-06-25 Thread Bryan Jeffrey
Hello.

I am running Spark 2.4.4. I have implemented a custom metrics producer. It
works well when I run locally, or specify the metrics producer only for the
driver.  When I ask for executor metrics I run into ClassNotFoundExceptions

*Is it possible to pass a metrics JAR via --jars?  If so what am I missing?*

Deploy driver stats via:
--jars hdfs:///custommetricsprovider.jar
--conf
spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink

However, when I pass the JAR with the metrics provider to executors via:
--jars hdfs:///custommetricsprovider.jar
--conf
spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink

I get ClassNotFoundException:

20/06/25 21:19:35 ERROR MetricsSystem: Sink class
org.apache.spark.custommetricssink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.custommetricssink
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at
org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 4 more

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Thank you,

Bryan


Custom Metrics

2020-06-18 Thread Bryan Jeffrey
Hello.

We're using Spark 2.4.4.  We have a custom metrics sink consuming the
Spark-produced metrics (e.g. heap free, etc.).  I am trying to determine a
good mechanism to pass the Spark application name into the metrics sink.
Current the application ID is included, but not the application name. Is
there a suggested mechanism?

Thank you,

Bryan Jeffrey


Data Source - State (SPARK-28190)

2020-03-30 Thread Bryan Jeffrey
Hi, Jungtaek.

We've been investigating the use of Spark Structured Streaming to replace
our Spark Streaming operations.  We have several cases where we're using
mapWithState to maintain state across batches, often with high volumes of
data.  We took a look at the Structured Streaming stateful processing.
Structured Streaming state processing looks great, but has some
shortcomings:
1. State can only be hydrated from checkpoint, which means that
modification of the state is not possible.
2. You cannot cleanup or normalize state data after it has been processed.

These shortcomings appear to be potentially addressed by your
ticket SPARK-28190 - "Data Source - State".  I see little activity on this
ticket. Can you help me to understand where this feature currently stands?

Thank you,

Bryan Jeffrey


Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-29 Thread Bryan Jeffrey
Jungtaek,

Thank you for taking a look at this issue. I would be happy to test your
patch - but I am unsure how to do so.  Can you help me to understand how I
can:
1. Obtain an updated version of the JAR in question either via a nightly
build or by building myself?
2. Substitute this JAR, ideally via changing a dependency in our code to
point to a local dependency (via Maven?)
3. Determine how I can substitute the JAR on the Spark cluster via changes
to our Maven dependency selection or less ideally by replacing the JAR on a
Spark cluster?

I would appreciate any pointers in this area as I'd like to come up to
speed on the right way to validate changes and perhaps contribute myself.

Regards,

Bryan Jeffrey

On Sat, Feb 29, 2020 at 9:52 AM Jungtaek Lim 
wrote:

> Forgot to mention - it only occurs the SQL type of UDT is having fixed
> length. If the UDT is used to represent complex type like array, struct, or
> even string, it doesn't trigger the issue. So that's like an edge-case and
> the chance of encountering this issue may not be that huge, and that's why
> this issue pops up now whereas the relevant code lives very long time.
>
> On Sat, Feb 29, 2020 at 11:44 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I've investigated a bit, and looks like it's not an issue of
>> mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It
>> seems to miss handling UDT and the missing spot makes the internal code of
>> Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)
>>
>> I've filed an issue (sorry I missed you've already filed an issue) and
>> submitted a patch. https://issues.apache.org/jira/browse/SPARK-30993
>>
>> It would be nice if you can try out my patch and see whether it fixes
>> your issue (I've already copied your code and made it pass, but would like
>> to double check). Thanks for reporting!
>>
>> On Sat, Feb 29, 2020 at 11:26 AM Bryan Jeffrey 
>> wrote:
>>
>>>
>>> Hi Tathagata.
>>>
>>> I tried making changes as you suggested:
>>>
>>> @SQLUserDefinedType(udt = classOf[JodaTimeUDT])
>>> class JodaTimeUDT extends UserDefinedType[DateTime] {
>>>   override def sqlType: DataType  = TimestampType
>>>
>>>   override def serialize(obj: DateTime): Long = {
>>> obj.getMillis
>>>   }
>>>
>>>   def deserialize(datum: Any): DateTime = {
>>> datum match {
>>>case value: Long => new DateTime(value, DateTimeZone.UTC)
>>> }
>>>   }
>>>
>>>   override def userClass: Class[DateTime] = classOf[DateTime]
>>>
>>>   private[spark] override def asNullable: JodaTimeUDT = this
>>> }
>>>
>>> object JodaTimeUDTRegister {
>>>   def register : Unit = {
>>> UDTRegistration.register(classOf[DateTime].getName, 
>>> classOf[JodaTimeUDT].getName)
>>>   }
>>> }
>>>
>>>
>>> This did not resolve the problem.  The results remain the same:
>>>
>>>
>>> org.scalatest.exceptions.TestFailedException: 
>>> Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1), 
>>> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same 
>>> elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), 
>>> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))
>>>
>>>
>>> I included a couple of other test cases to validate that the UDT works fine:
>>>
>>>
>>> "the joda time serializer" should "serialize and deserialize as expected" 
>>> in {
>>>   val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>>>   val serializer = new JodaTimeUDT()
>>>   val serialized = serializer.serialize(input)
>>>   val deserialized = serializer.deserialize(serialized)
>>>
>>>   deserialized should be(input)
>>> }
>>>
>>> it should "correctly implement dataframe serialization & deserialization in 
>>> data frames" in {
>>>   val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>>>   val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
>>>   val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
>>>   val sqlContext = session.sqlContext
>>>   import sqlContext.implicits._
>>>   val ds = input.toDF().as[FooWithDate]
>>>   val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, 
>>> Minutes(1)), x.s, x.i + 1)).collect()
>>>   val expected = List(FooWi

Fwd: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Hi Tathagata.

I tried making changes as you suggested:

@SQLUserDefinedType(udt = classOf[JodaTimeUDT])
class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = TimestampType

  override def serialize(obj: DateTime): Long = {
obj.getMillis
  }

  def deserialize(datum: Any): DateTime = {
datum match {
   case value: Long => new DateTime(value, DateTimeZone.UTC)
}
  }

  override def userClass: Class[DateTime] = classOf[DateTime]

  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = {
UDTRegistration.register(classOf[DateTime].getName,
classOf[JodaTimeUDT].getName)
  }
}


This did not resolve the problem.  The results remain the same:


org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1),
FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the
same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1),
FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))


I included a couple of other test cases to validate that the UDT works fine:


"the joda time serializer" should "serialize and deserialize as expected" in {
  val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
  val serializer = new JodaTimeUDT()
  val serialized = serializer.serialize(input)
  val deserialized = serializer.deserialize(serialized)

  deserialized should be(input)
}

it should "correctly implement dataframe serialization &
deserialization in data frames" in {
  val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
  val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
  val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
  val sqlContext = session.sqlContext
  import sqlContext.implicits._
  val ds = input.toDF().as[FooWithDate]
  val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date,
Minutes(1)), x.s, x.i + 1)).collect()
  val expected = List(FooWithDate(datePlusOne, "Foo", 2),
FooWithDate(datePlusOne, "Foo", 4))

  result should contain theSameElementsAs expected
}


Any other thoughts?


On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das 
wrote:

> Sounds like something to do with the serialization/deserialization, and
> not related to mapGroupsWithState.
>
>
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
>
> The docs says that
> 1. this is deprecated and therefore should not be used
> 2. you have to use the annotation `SQLUserDefinedType
> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
> on the class definition. You dont seem to have done it, maybe thats the
> reason?
>
> I would debug by printing the values in the serialize/deserialize methods,
> and then passing it through the groupBy that is known to fail.
>
> TD
>
> On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey 
> wrote:
>
>> Tathagata,
>>
>> The difference is more than hours off. In this instance it's different by
>> 4 years. In other instances it's different by tens of years (and other
>> smaller durations).
>>
>> We've considered moving to storage as longs, but this makes code much
>> less readable and harder to maintain. The udt serialization bug also causes
>> issues outside of stateful streaming, as when executing a simple group by.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>> --
>> *From:* Tathagata Das 
>> *Sent:* Friday, February 28, 2020 4:56:07 PM
>> *To:* Bryan Jeffrey 
>> *Cc:* user 
>> *Subject:* Re: Structured Streaming: mapGroupsWithState UDT
>> serialization does not work
>>
>> You are deserializing by explicitly specifying UTC timezone, but when
>> serializing you are not specifying it. Maybe that is reason?
>>
>> Also, if you can encode it using just long, then I recommend just saving
>> the value as long and eliminating some of the serialization overheads.
>> Spark will probably better optimize stuff if it sees it as a long rather
>> than an opaque UDT.
>>
>> TD
>>
>> On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
>> wrote:
>>
>> Hello.
>>
>> I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
>> mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
>> time in a number of data structures, and so we've generated a custom
>> serializer for Joda.  This works well in most dataset/dataframe structured
>> streaming operations. However, when

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Perfect. I'll give this a shot and report back.

Get Outlook for Android<https://aka.ms/ghei36>


From: Tathagata Das 
Sent: Friday, February 28, 2020 6:23:07 PM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does 
not work

Sounds like something to do with the serialization/deserialization, and not 
related to mapGroupsWithState.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala

The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation 
`SQLUserDefinedType<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
 on the class definition. You dont seem to have done it, maybe thats the reason?

I would debug by printing the values in the serialize/deserialize methods, and 
then passing it through the groupBy that is known to fail.

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Tathagata,

The difference is more than hours off. In this instance it's different by 4 
years. In other instances it's different by tens of years (and other smaller 
durations).

We've considered moving to storage as longs, but this makes code much less 
readable and harder to maintain. The udt serialization bug also causes issues 
outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey

Get Outlook for Android<https://aka.ms/ghei36>


From: Tathagata Das 
mailto:tathagata.das1...@gmail.com>>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey mailto:bryan.jeff...@gmail.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does 
not work

You are deserializing by explicitly specifying UTC timezone, but when 
serializing you are not specifying it. Maybe that is reason?

Also, if you can encode it using just long, then I recommend just saving the 
value as long and eliminating some of the serialization overheads. Spark will 
probably better optimize stuff if it sees it as a long rather than an opaque 
UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Hello.

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with 
mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time 
in a number of data structures, and so we've generated a custom serializer for 
Joda.  This works well in most dataset/dataframe structured streaming 
operations. However, when running mapGroupsWithState we observed that incorrect 
dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an 
effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now 
has invalid date

Here is a simple repro:

Joda Time UDT:


private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = LongType
  override def serialize(obj: DateTime): Long = obj.getMillis
  def deserialize(datum: Any): DateTime = datum match { case value: Long => new 
DateTime(value, DateTimeZone.UTC) }
  override def userClass: Class[DateTime] = classOf[DateTime]
  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, 
classOf[JodaTimeUDT].getName)  }
}

Test Leveraging Joda UDT:


case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
BeforeAndAfterAll {
  val application = this.getClass.getName
  var session: SparkSession = _

  override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.testing", "true")
  .set("spark.memory.fraction", "1")
  .set("spark.ui.enabled", "false")
  .set("spark.streaming.gracefulStopTimeout", "1000")
  .setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
  }

  override def afterAll(): Unit = {
session.stop()
  }

  it should "work correctly for a streaming input with stateful transformation" 
in {
val date = new D

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Tathagata,

The difference is more than hours off. In this instance it's different by 4 
years. In other instances it's different by tens of years (and other smaller 
durations).

We've considered moving to storage as longs, but this makes code much less 
readable and harder to maintain. The udt serialization bug also causes issues 
outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey

Get Outlook for Android<https://aka.ms/ghei36>


From: Tathagata Das 
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does 
not work

You are deserializing by explicitly specifying UTC timezone, but when 
serializing you are not specifying it. Maybe that is reason?

Also, if you can encode it using just long, then I recommend just saving the 
value as long and eliminating some of the serialization overheads. Spark will 
probably better optimize stuff if it sees it as a long rather than an opaque 
UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Hello.

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with 
mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time 
in a number of data structures, and so we've generated a custom serializer for 
Joda.  This works well in most dataset/dataframe structured streaming 
operations. However, when running mapGroupsWithState we observed that incorrect 
dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an 
effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now 
has invalid date

Here is a simple repro:

Joda Time UDT:


private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = LongType
  override def serialize(obj: DateTime): Long = obj.getMillis
  def deserialize(datum: Any): DateTime = datum match { case value: Long => new 
DateTime(value, DateTimeZone.UTC) }
  override def userClass: Class[DateTime] = classOf[DateTime]
  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, 
classOf[JodaTimeUDT].getName)  }
}

Test Leveraging Joda UDT:


case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
BeforeAndAfterAll {
  val application = this.getClass.getName
  var session: SparkSession = _

  override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.testing", "true")
  .set("spark.memory.fraction", "1")
  .set("spark.ui.enabled", "false")
  .set("spark.streaming.gracefulStopTimeout", "1000")
  .setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
  }

  override def afterAll(): Unit = {
session.stop()
  }

  it should "work correctly for a streaming input with stateful transformation" 
in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), 
FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new 
MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], 
GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
  .groupByKey(x => x.i)
  
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"


result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * 
from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, 
"FooFoo", 6))
combinedResults should contain theSam

Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Hello.

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
time in a number of data structures, and so we've generated a custom
serializer for Joda.  This works well in most dataset/dataframe structured
streaming operations. However, when running mapGroupsWithState we observed
that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in
an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state
now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = LongType
  override def serialize(obj: DateTime): Long = obj.getMillis
  def deserialize(datum: Any): DateTime = datum match { case value:
Long => new DateTime(value, DateTimeZone.UTC) }
  override def userClass: Class[DateTime] = classOf[DateTime]
  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = {
UDTRegistration.register(classOf[DateTime].getName,
classOf[JodaTimeUDT].getName)  }
}


Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory
with BeforeAndAfterAll {
  val application = this.getClass.getName
  var session: SparkSession = _

  override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.testing", "true")
  .set("spark.memory.fraction", "1")
  .set("spark.ui.enabled", "false")
  .set("spark.streaming.gracefulStopTimeout", "1000")
  .setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
  }

  override def afterAll(): Unit = {
session.stop()
  }

  it should "work correctly for a streaming input with stateful
transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date,
"Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new
MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate],
GroupState[FooWithDate]) => FooWithDate =
TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
  .groupByKey(x => x.i)
  
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"


result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText =
s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1),
FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
  }
}

object TestJodaTimeUdt {
  def updateFooState(id: Int, inputs: Iterator[FooWithDate], state:
GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
  state.remove()
  state.getOption.get
} else {
  val inputsSeq: Seq[FooWithDate] = inputs.toSeq
  val startingState = state.getOption.getOrElse(inputsSeq.head)
  val toProcess = if (state.getOption.isDefined) inputsSeq else
inputsSeq.tail
  val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

  state.update(updatedFoo)
  state.setTimeoutDuration("1 minute")
  updatedFoo
}
  }

  def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate =
FooWithDate(b.date, a.s + b.s, a.i + b.i)
}


The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1),
FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same
elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1),
FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey


Accumulator v2

2020-01-21 Thread Bryan Jeffrey
Hello.

We're currently using Spark streaming (Spark 2.3) for a number of
applications. One pattern we've used successfully is to generate an
accumulator inside a DStream transform statement.  We then accumulate
values associated with the RDD as we process the data.  A stage completion
listener that listens for stage complete events, retrieves the
AccumulableInfo for our custom classes and exhausts the statistics to our
back-end.

We're trying to move more of our applications to using Structured
Streaming.  However, the accumulator pattern does not seem to obviously fit
Structured Streaming.  In many cases we're able to see basic statistics
(e.g. # input and # output events) from the built-in statistics.  We need
to determine a pattern for more complex statistics (# errors, # of internal
records, etc).  Defining an accumulator on startup and adding statistics,
we're able to see the statistics - but only updates - so if we read 10
records in the first trigger, and 15 in the second trigger we see
accumulated values of 10, 25.

There are several options that might allow us to move ahead:
1. We could have the AccumulableInfo contain previous counts and current
counts
2. We could maintain current and previous counts separately
3. We could maintain a list of ID to AccumulatorV2 and then call
accumulator.reset() once we've read data

All of these options seem a little bit like a hacky workaround.  Has anyone
encountered this use-case?  Is there a good pattern to follow?

Regards,

Bryan Jeffrey


Structured Streaming & Enrichment Broadcasts

2019-11-18 Thread Bryan Jeffrey
Hello.

We're running applications using Spark Streaming.  We're going to begin
work to move to using Structured Streaming.  One of our key scenarios is to
lookup values from an external data source for each record in an incoming
stream.  In Spark Streaming we currently read the external data, broadcast
it and then lookup the value from the broadcast.  The broadcast value is
refreshed on a periodic basis - with the need to refresh evaluated on each
batch (in a foreachRDD).  The broadcasts are somewhat large (~1M records).
Each stream we're doing the lookup(s) for is ~6M records / second.

While we could conceivably continue this pattern in Structured Streaming
with Spark 2.4.x and the 'foreachBatch', based on my read of documentation
this seems like a bit of an anti-pattern in Structured Streaming.

So I am looking for advice: What mechanism would you suggest to on a
periodic basis read an external data source and do a fast lookup for a
streaming input.  One option appears to be to do a broadcast left outer
join?  In the past this mechanism has been less easy to performance tune
than doing an explicit broadcast and lookup.

Regards,

Bryan Jeffrey


Re: PySpark Pandas UDF

2019-11-17 Thread Bryan Cutler
There was a change in the binary format of Arrow 0.15.1 and there is an
environment variable you can set to make pyarrow 0.15.1 compatible with
current Spark, which looks to be your problem. Please see the doc below for
instructions added in SPARK-2936. Note, this will not be required for the
upcoming release of Spark 3.0.0.
https://github.com/apache/spark/blob/master/docs/sql-pyspark-pandas-with-arrow.md#compatibiliy-setting-for-pyarrow--0150-and-spark-23x-24x

On Tue, Nov 12, 2019 at 7:53 AM Holden Karau  wrote:

> Thanks for sharing that. I think we should maybe add some checks around
> this so it’s easier to debug. I’m CCing Bryan who might have some thoughts.
>
> On Tue, Nov 12, 2019 at 7:42 AM gal.benshlomo 
> wrote:
>
>> SOLVED!
>> thanks for the help - I found the issue. it was the version of pyarrow
>> (0.15.1) which apparently isn't currently stable. Downgrading it solved
>> the
>> issue for me
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: [DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-12 Thread Bryan Cutler
Thanks all. I created a WIP PR at https://github.com/apache/spark/pull/26496,
we can further discuss the details in there.

On Thu, Nov 7, 2019 at 7:01 PM Takuya UESHIN  wrote:

> +1
>
> On Thu, Nov 7, 2019 at 6:54 PM Shane Knapp  wrote:
>
>> +1
>>
>> On Thu, Nov 7, 2019 at 6:08 PM Hyukjin Kwon  wrote:
>> >
>> > +1
>> >
>> > 2019년 11월 6일 (수) 오후 11:38, Wenchen Fan 님이 작성:
>> >>
>> >> Sounds reasonable to me. We should make the behavior consistent within
>> Spark.
>> >>
>> >> On Tue, Nov 5, 2019 at 6:29 AM Bryan Cutler  wrote:
>> >>>
>> >>> Currently, when a PySpark Row is created with keyword arguments, the
>> fields are sorted alphabetically. This has created a lot of confusion with
>> users because it is not obvious (although it is stated in the pydocs) that
>> they will be sorted alphabetically. Then later when applying a schema and
>> the field order does not match, an error will occur. Here is a list of some
>> of the JIRAs that I have been tracking all related to this issue:
>> SPARK-24915, SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion
>> of the issue [1].
>> >>>
>> >>> The original reason for sorting fields is because kwargs in python <
>> 3.6 are not guaranteed to be in the same order that they were entered [2].
>> Sorting alphabetically ensures a consistent order. Matters are further
>> complicated with the flag _from_dict_ that allows the Row fields to to be
>> referenced by name when made by kwargs, but this flag is not serialized
>> with the Row and leads to inconsistent behavior. For instance:
>> >>>
>> >>> >>> spark.createDataFrame([Row(A="1", B="2")], "B string, A
>> string").first()
>> >>> Row(B='2', A='1')
>> >>> >>> spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1",
>> B="2")]), "B string, A string").first()
>> >>> Row(B='1', A='2')
>> >>>
>> >>> I think the best way to fix this is to remove the sorting of fields
>> when constructing a Row. For users with Python 3.6+, nothing would change
>> because these versions of Python ensure that the kwargs stays in the
>> ordered entered. For users with Python < 3.6, using kwargs would check a
>> conf to either raise an error or fallback to a LegacyRow that sorts the
>> fields as before. With Python < 3.6 being deprecated now, this LegacyRow
>> can also be removed at the same time. There are also other ways to create
>> Rows that will not be affected. I have opened a JIRA [3] to capture this,
>> but I am wondering what others think about fixing this for Spark 3.0?
>> >>>
>> >>> [1] https://github.com/apache/spark/pull/20280
>> >>> [2] https://www.python.org/dev/peps/pep-0468/
>> >>> [3] https://issues.apache.org/jira/browse/SPARK-29748
>>
>>
>>
>> --
>> Shane Knapp
>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>> https://rise.cs.berkeley.edu
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Takuya UESHIN
> Tokyo, Japan
>
> http://twitter.com/ueshin
>


[DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-04 Thread Bryan Cutler
Currently, when a PySpark Row is created with keyword arguments, the fields
are sorted alphabetically. This has created a lot of confusion with users
because it is not obvious (although it is stated in the pydocs) that they
will be sorted alphabetically. Then later when applying a schema and the
field order does not match, an error will occur. Here is a list of some of
the JIRAs that I have been tracking all related to this issue: SPARK-24915,
SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion of the issue
[1].

The original reason for sorting fields is because kwargs in python < 3.6
are not guaranteed to be in the same order that they were entered [2].
Sorting alphabetically ensures a consistent order. Matters are further
complicated with the flag _*from_dict*_ that allows the Row fields to to be
referenced by name when made by kwargs, but this flag is not serialized
with the Row and leads to inconsistent behavior. For instance:

>>> spark.createDataFrame([Row(A="1", B="2")], "B string, A string").first()
Row(B='2', A='1')>>>
spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1",
B="2")]), "B string, A string").first()
Row(B='1', A='2')

I think the best way to fix this is to remove the sorting of fields when
constructing a Row. For users with Python 3.6+, nothing would change
because these versions of Python ensure that the kwargs stays in the
ordered entered. For users with Python < 3.6, using kwargs would check a
conf to either raise an error or fallback to a LegacyRow that sorts the
fields as before. With Python < 3.6 being deprecated now, this LegacyRow
can also be removed at the same time. There are also other ways to create
Rows that will not be affected. I have opened a JIRA [3] to capture this,
but I am wondering what others think about fixing this for Spark 3.0?

[1] https://github.com/apache/spark/pull/20280
[2] https://www.python.org/dev/peps/pep-0468/
[3] https://issues.apache.org/jira/browse/SPARK-29748


Re: question about pyarrow.Table to pyspark.DataFrame conversion

2019-09-10 Thread Bryan Cutler
Hi Artem,

I don't believe this is currently possible, but it could be a great
addition to PySpark since this would offer a convenient and efficient way
to parallelize nested column data. I created the JIRA
https://issues.apache.org/jira/browse/SPARK-29040 for this.

On Tue, Aug 27, 2019 at 7:55 PM Artem Kozhevnikov <
kozhevnikov.ar...@gmail.com> wrote:

> I wonder if there's some recommended method to convert in memory
> pyarrow.Table (or pyarrow.BatchRecord) to pyspark.Dataframe without using
> pandas ?
> My motivation is about converting nested data (like List[int]) that have
> an efficient representation in pyarrow which is not possible with Pandas (I
> don't want to pass by python list of int ...).
>
> Thanks in advance !
> Artem
>
>
>


Driver - Stops Scheduling Streaming Jobs

2019-08-27 Thread Bryan Jeffrey
Hello!

We're running Spark 2.3.0 on Scala 2.11.  We have a number of Spark
Streaming jobs that are using MapWithState.  We've observed that these jobs
will complete some set of stages, and then not schedule the next set of
stages.  It looks like the DAG Scheduler correctly identifies required
stages:

19/08/27 15:29:48 INFO YarnClusterScheduler: Removed TaskSet 79.0, whose
tasks have all completed, from pool
19/08/27 15:29:48 INFO DAGScheduler: ShuffleMapStage 79 (map at
SomeCode.scala:121) finished in 142.985 s
19/08/27 15:29:48 INFO DAGScheduler: looking for newly runnable stages
19/08/27 15:29:48 INFO DAGScheduler: running: Set()
19/08/27 15:29:48 INFO DAGScheduler: waiting: Set(ShuffleMapStage 81,
ResultStage 82, ResultStage 83, ShuffleMapStage 54, ResultStage 61,
ResultStage 55, ShuffleMapStage 48, ShuffleMapStage 84, Result
Stage 49, ShuffleMapStage 85, ShuffleMapStage 56, ResultStage 86,
ShuffleMapStage 57, ResultStage 58, ResultStage 80)
19/08/27 15:29:48 INFO DAGScheduler: failed: Set()

However, we see no stages that begin execution.  This happens semi-rarely
(every couple of days), which makes repro difficult.  I checked known bugs
fixed in 2.3.x and did not see anything pop out.  Has anyone else seen this
behavior? Any thoughts on debugging?

Regards,

Bryan Jeffrey


Re: Usage of PyArrow in Spark

2019-07-18 Thread Bryan Cutler
It would be possible to use arrow on regular python udfs and avoid pandas,
and there would probably be some performance improvement. The difficult
part will be to ensure that the data remains consistent in the conversions
between Arrow and Python, e.g. timestamps are a bit tricky.  Given that we
already have pandas_udfs, I'm not sure if it would be worth the effort but
it might be a good experiment to see how much improvement it would bring.

Bryan

On Thu, Jul 18, 2019 at 12:02 AM Abdeali Kothari 
wrote:

> I was thinking of implementing that. But quickly realized that doing a
> conversion of Spark -> Pandas -> Python causes errors.
>
> A quick example being "None" in Numeric data types.
> Pandas supports only NaN. Spark supports NULL and NaN.
>
> This is just one of the issues I came to.
> I'm not sure about some of the more complex types like Array, Map, struct
> which are internally converted to pd.Series with type being object.
>
> I think that avoiding pandas in between and doing something from Arrow to
> Python would be more efficient as, if I understand right, Arrow has a wider
> range of types and can handle this better.
>
> >>> from pyspark.sql import functions as F
> >>> sdf = spark.createDataFrame([ [None], [float('nan')], [1.1] ], ['val'])
>
> # Return the column with no change
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col)
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> ++-+
> | val|(val)|
> ++-+
> |null| null|
> | NaN| null|
> | 1.1|  1.1|
> ++-+
>
> # isnull()
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
> col.isnull())
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> ++-+
> | val|(val)|
> ++-+
> |null|  1.0|
> | NaN|  1.0|
> | 1.1|  0.0|
> ++-+
>
> # Check for "is None"
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
> col.apply(lambda x: x is None))
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> ++-+
> | val|(val)|
> ++-+
> |null|  0.0|
> | NaN|  0.0|
> | 1.1|  0.0|
> ++-+
>
> On Wed, Jul 17, 2019 at 4:47 PM Hyukjin Kwon  wrote:
>
>> Regular Python UDFs don't use PyArrow under the hood.
>> Yes, they can potentially benefit but they can be easily worked around
>> via Pandas UDFs.
>>
>> For instance, both below are virtually identical.
>>
>> @udf(...)
>> def func(col):
>> return col
>>
>> @pandas_udf(...)
>> def pandas_func(col):
>> return a.apply(lambda col: col)
>>
>> If we only need some minimised change, I would be positive about adding
>> Arrow support into regular Python UDFs. Otherwise, I am not sure yet.
>>
>>
>> 2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari 님이
>> 작성:
>>
>>> Hi,
>>> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
>>> spark. And I was trying to understand the benefit in terms of serialization
>>> / deserializaiton it provides.
>>>
>>> I understand that the new pandas-udf works only if pyarrow is installed.
>>> But what about the plain old PythonUDF which can be used in map() kind
>>> of operations?
>>> Are they also using pyarrow under the hood to reduce the cost is serde?
>>> Or do they remain as earlier and no performance gain should be expected in
>>> those?
>>>
>>> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow
>>> as the data transfer cost to serialize/deserialzie from Java to Python and
>>> back still exists and could potentially be reduced by using Arrow?
>>> Is my understanding correct? Are there any plans to implement this?
>>>
>>> Pointers to any notes or Jira about this would be appreciated.
>>>
>>


Re: Should python-2 be supported in Spark 3.0?

2019-05-31 Thread Bryan Cutler
+1 and the draft sounds good

On Thu, May 30, 2019, 11:32 AM Xiangrui Meng  wrote:

> Here is the draft announcement:
>
> ===
> Plan for dropping Python 2 support
>
> As many of you already knew, Python core development team and many
> utilized Python packages like Pandas and NumPy will drop Python 2 support
> in or before 2020/01/01. Apache Spark has supported both Python 2 and 3
> since Spark 1.4 release in 2015. However, maintaining Python 2/3
> compatibility is an increasing burden and it essentially limits the use of
> Python 3 features in Spark. Given the end of life (EOL) of Python 2 is
> coming, we plan to eventually drop Python 2 support as well. The current
> plan is as follows:
>
> * In the next major release in 2019, we will deprecate Python 2 support.
> PySpark users will see a deprecation warning if Python 2 is used. We will
> publish a migration guide for PySpark users to migrate to Python 3.
> * We will drop Python 2 support in a future release in 2020, after Python
> 2 EOL on 2020/01/01. PySpark users will see an error if Python 2 is used.
> * For releases that support Python 2, e.g., Spark 2.4, their patch
> releases will continue supporting Python 2. However, after Python 2 EOL, we
> might not take patches that are specific to Python 2.
> ===
>
> Sean helped make a pass. If it looks good, I'm going to upload it to Spark
> website and announce it here. Let me know if you think we should do a VOTE
> instead.
>
> On Thu, May 30, 2019 at 9:21 AM Xiangrui Meng  wrote:
>
>> I created https://issues.apache.org/jira/browse/SPARK-27884 to track the
>> work.
>>
>> On Thu, May 30, 2019 at 2:18 AM Felix Cheung 
>> wrote:
>>
>>> We don’t usually reference a future release on website
>>>
>>> > Spark website and state that Python 2 is deprecated in Spark 3.0
>>>
>>> I suspect people will then ask when is Spark 3.0 coming out then. Might
>>> need to provide some clarity on that.
>>>
>>
>> We can say the "next major release in 2019" instead of Spark 3.0. Spark
>> 3.0 timeline certainly requires a new thread to discuss.
>>
>>
>>>
>>>
>>> --
>>> *From:* Reynold Xin 
>>> *Sent:* Thursday, May 30, 2019 12:59:14 AM
>>> *To:* shane knapp
>>> *Cc:* Erik Erlandson; Mark Hamstra; Matei Zaharia; Sean Owen; Wenchen
>>> Fen; Xiangrui Meng; dev; user
>>> *Subject:* Re: Should python-2 be supported in Spark 3.0?
>>>
>>> +1 on Xiangrui’s plan.
>>>
>>> On Thu, May 30, 2019 at 7:55 AM shane knapp  wrote:
>>>
 I don't have a good sense of the overhead of continuing to support
> Python 2; is it large enough to consider dropping it in Spark 3.0?
>
> from the build/test side, it will actually be pretty easy to continue
 support for python2.7 for spark 2.x as the feature sets won't be expanding.

>>>
 that being said, i will be cracking a bottle of champagne when i can
 delete all of the ansible and anaconda configs for python2.x.  :)

>>>
>> On the development side, in a future release that drops Python 2 support
>> we can remove code that maintains python 2/3 compatibility and start using
>> python 3 only features, which is also quite exciting.
>>
>>
>>>
 shane
 --
 Shane Knapp
 UC Berkeley EECS Research / RISELab Staff Technical Lead
 https://rise.cs.berkeley.edu

>>>


Re: pySpark - pandas UDF and binaryType

2019-05-02 Thread Bryan Cutler
Hi,

BinaryType support was not added until Spark 2.4.0, see
https://issues.apache.org/jira/browse/SPARK-23555. Also, pyarrow 0.10.0 or
greater is require as you saw in the docs.

Bryan

On Thu, May 2, 2019 at 4:26 AM Nicolas Paris 
wrote:

> Hi all
>
> I am using pySpark 2.3.0 and pyArrow 0.10.0
>
> I want to apply a pandas-udf on a dataframe with 
> I have the bellow error:
>
> > Invalid returnType with grouped map Pandas UDFs:
> >
> StructType(List(StructField(filename,StringType,true),StructField(contents,BinaryType,true)))
> > is not supported
>
>
> I am missing something ?
> the doc
> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types
> says pyArrow 0.10 is minimum to handle binaryType
>
> here is the code:
>
> > from pyspark.sql.functions import pandas_udf, PandasUDFType
> >
> > df = sql("select filename, contents from test_binary")
> >
> > @pandas_udf("filename String, contents binary",
> PandasUDFType.GROUPED_MAP)
> > def transform_binary(pdf):
> > contents = pdf.contents
> > return pdf.assign(contents=contents)
> >
> > df.groupby("filename").apply(transform_binary).count()
>
> Thanks
> --
> nicolas
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Driver OOM does not shut down Spark Context

2019-01-31 Thread Bryan Jeffrey
Hello.

I am running Spark 2.3.0 via Yarn.  I have a Spark Streaming application
where the driver threw an uncaught out of memory exception:

19/01/31 13:00:59 ERROR Utils: Uncaught exception in thread
element-tracking-store-worker
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.spark.util.kvstore.KVTypeInfo$MethodAccessor.get(KVTypeInfo.java:154)
at
org.apache.spark.util.kvstore.InMemoryStore$InMemoryView.compare(InMemoryStore.java:248)
at
org.apache.spark.util.kvstore.InMemoryStore$InMemoryView.lambda$iterator$0(InMemoryStore.java:203)
at
org.apache.spark.util.kvstore.InMemoryStore$InMemoryView$$Lambda$27/1691147907.compare(Unknown
Source)
at java.util.TimSort.binarySort(TimSort.java:296)
at java.util.TimSort.sort(TimSort.java:239)
at java.util.Arrays.sort(Arrays.java:1512)
at java.util.ArrayList.sort(ArrayList.java:1462)
at java.util.Collections.sort(Collections.java:175)
at
org.apache.spark.util.kvstore.InMemoryStore$InMemoryView.iterator(InMemoryStore.java:203)
at
scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
org.apache.spark.status.AppStatusListener$$anonfun$org$apache$spark$status$AppStatusListener$$cleanupStages$1.apply(AppStatusListener.scala:894)
at
org.apache.spark.status.AppStatusListener$$anonfun$org$apache$spark$status$AppStatusListener$$cleanupStages$1.apply(AppStatusListener.scala:874)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.status.AppStatusListener.org
$apache$spark$status$AppStatusListener$$cleanupStages(AppStatusListener.scala:874)
at
org.apache.spark.status.AppStatusListener$$anonfun$3.apply$mcVJ$sp(AppStatusListener.scala:84)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1$$anonfun$apply$mcV$sp$1.apply(ElementTrackingStore.scala:109)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1$$anonfun$apply$mcV$sp$1.apply(ElementTrackingStore.scala:107)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(ElementTrackingStore.scala:107)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1.apply(ElementTrackingStore.scala:105)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1.apply(ElementTrackingStore.scala:105)
at org.apache.spark.util.Utils$.tryLog(Utils.scala:2001)
at
org.apache.spark.status.ElementTrackingStore$$anon$1.run(ElementTrackingStore.scala:91)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Despite the uncaught exception the Streaming application never terminated.
No new batches were started.  As a result my job did not process data for
some period of time (until our ancillary monitoring noticed the issue).

*Ask: What can we do to ensure that the driver is shut down when this type
of exception occurs?*

Regards,

Bryan Jeffrey


Re: spark2.4 arrow enabled true,error log not returned

2019-01-10 Thread Bryan Cutler
Hi, could you please clarify if you are running a YARN cluster when you see
this problem?  I tried on Spark standalone and could not reproduce.  If
it's on a YARN cluster, please file a JIRA and I can try to investigate
further.

Thanks,
Bryan

On Sat, Dec 15, 2018 at 3:42 AM 李斌松  wrote:

> spark2.4 arrow enabled true,error log not returned,in spark 2.3,There's
> no such problem.
>
> 1、spark.sql.execution.arrow.enabled=true
> [image: image.png]
> *yarn log:*
>
> 18/12/15 14:35:52 INFO CodeGenerator: Code generated in 1030.698785 ms
>>
>> 18/12/15 14:35:54 INFO PythonRunner: Times: total = 1985, boot = 1892,
>>> init = 92, finish = 1
>>
>> 18/12/15 14:35:54 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0).
>>> 1799 bytes result sent to driver
>>
>> 18/12/15 14:35:55 INFO CoarseGrainedExecutorBackend: Got assigned task 1
>>
>> 18/12/15 14:35:55 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
>>
>> 18/12/15 14:35:55 INFO TorrentBroadcast: Started reading broadcast
>>> variable 1
>>
>> 18/12/15 14:35:55 INFO MemoryStore: Block broadcast_1_piece0 stored as
>>> bytes in memory (estimated size 8.3 KB, free 1048.8 MB)
>>
>> 18/12/15 14:35:55 INFO TorrentBroadcast: Reading broadcast variable 1
>>> took 18 ms
>>
>> 18/12/15 14:35:55 INFO MemoryStore: Block broadcast_1 stored as values in
>>> memory (estimated size 14.0 KB, free 1048.8 MB)
>>
>> 18/12/15 14:35:55 INFO CodeGenerator: Code generated in 30.269745 ms
>>
>> 18/12/15 14:35:55 INFO PythonRunner: Times: total = 13, boot = 5, init =
>>> 7, finish = 1
>>
>> 18/12/15 14:35:55 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1).
>>> 1893 bytes result sent to driver
>>
>> 18/12/15 14:35:55 INFO CoarseGrainedExecutorBackend: Got assigned task 2
>>
>> 18/12/15 14:35:55 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
>>
>> 18/12/15 14:35:55 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID
>>> 2)
>>
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>>> last):
>>
>>   File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/worker.py", line
>>> 377, in main
>>
>> process()
>>
>>   File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/worker.py", line
>>> 372, in process
>>
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>
>>   File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/serializers.py",
>>> line 390, in dump_stream
>>
>> vs = list(itertools.islice(iterator, batch))
>>
>>   File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/util.py", line 99,
>>> in wrapper
>>
>> return f(*args, **kwargs)
>>
>>   File
>>> "/yarn/nm/usercache/admin/appcache/application_1544579748138_0215/container_e43_1544579748138_0215_01_01/python1.py",
>>> line 435, in mapfunc
>>
>> ValueError: could not convert string to float: 'a'
>>
>>
>>> at
>>> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
>>
>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
>>
>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
>>
>> at
>>> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
>>
>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>>
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>
>> at
>>> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:99)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>
>> at
>>> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
>>
>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>>
>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>>
>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>>
>> at scala.collection.TraversableOn

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-31 Thread Bryan Jeffrey
Cody,

Yes - I was able to verify that I am not seeing duplicate calls to
createDirectStream.  If the spark-streaming-kafka-0-10 will work on a 2.3
cluster I can go ahead and give that a shot.

Regards,

Bryan Jeffrey

On Fri, Aug 31, 2018 at 11:56 AM Cody Koeninger  wrote:

> Just to be 100% sure, when you're logging the group id in
> createDirectStream, you no longer see any duplicates?
>
> Regarding testing master, is the blocker that your spark cluster is on
> 2.3?  There's at least a reasonable chance that building an
> application assembly jar that uses the master version just for the
> spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster
>
> On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey 
> wrote:
> > Cody,
> >
> > We are connecting to multiple clusters for each topic.  I did experiment
> > this morning with both adding a cluster identifier to the group id, as
> well
> > as simply moving to use only a single one of our clusters.  Neither of
> these
> > were successful.  I am not able to run a test against master now.
> >
> > Regards,
> >
> > Bryan Jeffrey
> >
> >
> >
> >
> > On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger 
> wrote:
> >>
> >> I doubt that fix will get backported to 2.3.x
> >>
> >> Are you able to test against master?  2.4 with the fix you linked to
> >> is likely to hit code freeze soon.
> >>
> >> From a quick look at your code, I'm not sure why you're mapping over
> >> an array of brokers.  It seems like that would result in different
> >> streams with the same group id, because broker isn't part of your
> >> group id string.
> >>
> >> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <
> bryan.jeff...@gmail.com>
> >> wrote:
> >> > Hello, Spark Users.
> >> >
> >> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
> >> > We're
> >> > have a Spark streaming job, and we're reading a reasonable amount of
> >> > data
> >> > from Kafka (40 GB / minute or so).  We would like to move to using the
> >> > Kafka
> >> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from
> having
> >> > to
> >> > modify formats.
> >> >
> >> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> >> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
> >> > tried to
> >> > work around it as follows:
> >> >
> >> > 1. Disabled consumer caching.  This increased the total job time from
> ~1
> >> > minute per batch to ~1.8 minutes per batch.  This performance penalty
> is
> >> > unacceptable for our use-case. We also saw some partitions stop
> >> > receiving
> >> > for an extended period of time - I was unable to get a simple repro
> for
> >> > this
> >> > effect though.
> >> > 2. Disabled speculation and multiple-job concurrency and added caching
> >> > for
> >> > the stream directly after reading from Kafka & caching offsets.  This
> >> > approach seems to work well for simple examples (read from a Kafka
> >> > topic,
> >> > write to another topic). However, when we move through more complex
> >> > logic we
> >> > continue to see this type of error - despite only creating the stream
> >> > for a
> >> > given topic a single time.  We validated that we're creating the
> stream
> >> > from
> >> > a given topic / partition a single time by logging on stream creation,
> >> > caching the stream and (eventually) calling 'runJob' to actually go
> and
> >> > fetch the data. Nonetheless with multiple outputs we see the
> >> > ConcurrentModificationException.
> >> >
> >> > I've included some code down below.  I would be happy if anyone had
> >> > debugging tips for the workaround.  However, my main concern is to
> >> > ensure
> >> > that the 2.4 version will have a bug fix that will work for Spark
> >> > Streaming
> >> > in which multiple input topics map data to multiple outputs. I would
> >> > also
> >> > like to understand if the fix
> >> > (https://github.com/apache/spark/pull/20997)
> >> > will be backported to Spark 2.3.x
> >> >
> >> > In our code, read looks like the following:
> >> >
> >> > case class StreamLookupKey(topic: Set[St

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-31 Thread Bryan Jeffrey
Cody,

We are connecting to multiple clusters for each topic.  I did experiment
this morning with both adding a cluster identifier to the group id, as well
as simply moving to use only a single one of our clusters.  Neither of
these were successful.  I am not able to run a test against master now.

Regards,

Bryan Jeffrey




On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger  wrote:

> I doubt that fix will get backported to 2.3.x
>
> Are you able to test against master?  2.4 with the fix you linked to
> is likely to hit code freeze soon.
>
> From a quick look at your code, I'm not sure why you're mapping over
> an array of brokers.  It seems like that would result in different
> streams with the same group id, because broker isn't part of your
> group id string.
>
> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey 
> wrote:
> > Hello, Spark Users.
> >
> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> > have a Spark streaming job, and we're reading a reasonable amount of data
> > from Kafka (40 GB / minute or so).  We would like to move to using the
> Kafka
> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
> to
> > modify formats.
> >
> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
> tried to
> > work around it as follows:
> >
> > 1. Disabled consumer caching.  This increased the total job time from ~1
> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
> > unacceptable for our use-case. We also saw some partitions stop receiving
> > for an extended period of time - I was unable to get a simple repro for
> this
> > effect though.
> > 2. Disabled speculation and multiple-job concurrency and added caching
> for
> > the stream directly after reading from Kafka & caching offsets.  This
> > approach seems to work well for simple examples (read from a Kafka topic,
> > write to another topic). However, when we move through more complex
> logic we
> > continue to see this type of error - despite only creating the stream
> for a
> > given topic a single time.  We validated that we're creating the stream
> from
> > a given topic / partition a single time by logging on stream creation,
> > caching the stream and (eventually) calling 'runJob' to actually go and
> > fetch the data. Nonetheless with multiple outputs we see the
> > ConcurrentModificationException.
> >
> > I've included some code down below.  I would be happy if anyone had
> > debugging tips for the workaround.  However, my main concern is to ensure
> > that the 2.4 version will have a bug fix that will work for Spark
> Streaming
> > in which multiple input topics map data to multiple outputs. I would also
> > like to understand if the fix (
> https://github.com/apache/spark/pull/20997)
> > will be backported to Spark 2.3.x
> >
> > In our code, read looks like the following:
> >
> > case class StreamLookupKey(topic: Set[String], brokers: String)
> >
> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
> >
> > // Given inputs return a direct stream.
> > def createDirectStream(ssc: StreamingContext,
> >additionalKafkaParameters: Map[String, String],
> >brokersToUse: Array[String], //
> > broker1,broker2|broker3,broker4
> >topicsToUse: Array[String],
> >applicationName: String,
> >persist: Option[PersistenceManager],
> >useOldestOffsets: Boolean,
> >maxRatePerPartition: Long,
> >batchSeconds: Int
> >   ): DStream[DecodedData] = {
> >   val streams: Array[DStream[DecodedData]] =
> > brokersToUse.map(brokers => {
> >   val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
> >   val kafkaParameters: Map[String, String] =
> getKafkaParameters(brokers,
> > useOldestOffsets, groupId) ++ additionalKafkaParameters
> >   logger.info(s"Kafka Params: ${kafkaParameters}")
> >   val topics = topicsToUse.toSet
> >   logger.info(s"Creating Kafka direct connection -
> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
> > s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> > applicationGroup: ${groupId}")
> >
> >   streamMap.getOrElse(StreamLookupKey(topics, brokers),
> > createKafkaStr

ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-30 Thread Bryan Jeffrey
fsets: Boolean,
applicationName: String): Map[String, String] =
  Map[String, String](
"auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
"enable.auto.commit" -> false.toString, // we'll commit these manually
"key.deserializer" ->
classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[Decoders.MixedDecoder].getCanonicalName,
"partition.assignment.strategy" ->
classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
"bootstrap.servers" -> brokers,
"group.id" -> applicationName,
"session.timeout.ms" -> 24.toString,
"request.timeout.ms"-> 30.toString
  )

Write code looks like the following:

def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String],
conv: (T) => Array[Byte], numPartitions: Int): Unit = {
  val rddToWrite =
if (numPartitions > 0) {
  rdd.repartition(numPartitions)
} else {
  rdd
}

  // Get session from current threads session
  val session = SparkSession.builder().getOrCreate()
  val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
StructType(Array(StructField("value", BinaryType
  df.selectExpr("CAST('' AS STRING)", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", getBrokersToUse(brokers))
.option("compression.type", "gzip")
.option("retries", "3")
.option("topic", topic)
.save()
}

Regards,

Bryan Jeffrey


Re: Use Arrow instead of Pickle without pandas_udf

2018-07-30 Thread Bryan Cutler
Here is a link to the JIRA for adding StructType support for scalar
pandas_udf https://issues.apache.org/jira/browse/SPARK-24579


On Wed, Jul 25, 2018 at 3:36 PM, Hichame El Khalfi 
wrote:

> Hey Holden,
> Thanks for your reply,
>
> We currently using a python function that produces a Row(TS=LongType(),
> bin=BinaryType()).
> We use this function like this dataframe.rdd.map(my_function)
> .toDF().write.parquet()
>
> To reuse it in pandas_udf, we changes the return type to
> StructType(StructField(Long), StructField(BinaryType).
>
> 1)But we face an issue that StructType is not supported by pandas_udf.
>
> So I was wondering to still continue to reuse dataftame.rdd.map but get an
> improvement in serialization by using ArrowFormat instead of Pickle.
>
> *From:* hol...@pigscanfly.ca
> *Sent:* July 25, 2018 4:41 PM
> *To:* hich...@elkhalfi.com
> *Cc:* user@spark.apache.org
> *Subject:* Re: Use Arrow instead of Pickle without pandas_udf
>
> Not currently. What's the problem with pandas_udf for your use case?
>
> On Wed, Jul 25, 2018 at 1:27 PM, Hichame El Khalfi 
> wrote:
>
>> Hi There,
>>
>>
>> Is there a way to use Arrow format instead of Pickle but without using
>> pandas_udf ?
>>
>>
>> Thank for your help,
>>
>>
>> Hichame
>>
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: Arrow type issue with Pandas UDF

2018-07-19 Thread Bryan Cutler
Hi Patrick,

It looks like it's failing in Scala before it even gets to Python to
execute your udf, which is why it doesn't seem to matter what's in your
udf. Since you are doing a grouped map udf maybe your group sizes are too
big or skewed? Could you try to reduce the size of your groups by adding
more keys or sampling a fraction of the data? If the problem persists could
you make a jira? At the very least a better exception would be nice.

Bryan

On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy
 wrote:

> PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.
>
> I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions
> in the last stage of the job regardless of my output type.
>
>
> The problem I'm trying to solve:
> I have a column of scalar values, and each value on the same row has a
> sorted vector. I'm trying to replace each scalar value with its closest
> index from its vector. I'm applying the grouping arbitrarily and performing
> a python operation row-wise because even when the same vector appears on
> many rows it's not clear how I would get the lookup to scale.
>
> My input data, the product of a join of hive tables, has the following
> schema:
>
> root
>  |-- scalar_value: float (nullable = true)
>  |-- quantilelist: array (nullable = true)
>  ||-- element: double (containsNull = true)
>
>
> My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
> an operation on two columns, and because I want to take advantage of Arrow
> to avoid serialization.
>
> The schema my UDF returns is this:
>
> pos_schema = T.StructType([
> T.StructField('feature_value',T.FloatType(),True),
> T.StructField('error',T.StringType())
> ])
>
> ...however when I try to apply my UDF, either with saveAsTable or show(),
> I get the following exception:
>
> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
> expand the buffer
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
> at
> org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
> at
> org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
> at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> at
> org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
> at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> at
> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
>
> I assumed it was the result of some bad typing on my part, until I did a
> test with a degenerate UDF that only returns a column of 1:
>
> @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
>
> F.PandasUDFType.GROUPED_MAP)
>
> def groupedPercentileInt(df):
>
> return
> pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)
>
>
> This clearly only has one return value of type int, yet I get the same
> exception:
>
> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
> expand the buffer
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
> at
> org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.ja

Heap Memory in Spark 2.3.0

2018-07-16 Thread Bryan Jeffrey
6.0K->0.0B Heap:
8650.3M(20.0G)->7931.0M(20.0G)], [Metaspace: 52542K->52542K(1095680K)]
 [Times: user=20.80 sys=1.53, real=17.48 secs]
812.433: [Full GC (Allocation Failure) 817.544: [SoftReference, 112
refs, 0.858 secs]817.544: [WeakReference, 1606 refs, 0.0002380
secs]817.545: [FinalReference, 1175 refs, 0.0003358 secs]817.545:
[PhantomReference, 0 refs, 12 refs, 0.135 secs]817.545: [JNI Weak
Reference, 0.305 secs] 7931M->7930M(20G), 15.9899953 secs]
   [Eden: 0.0B(9588.0M)->0.0B(9588.0M) Survivors: 0.0B->0.0B Heap:
7931.0M(20.0G)->7930.8M(20.0G)], [Metaspace: 52542K->52477K(1095680K)]
 [Times: user=20.88 sys=0.87, real=15.99 secs]
828.425: [GC concurrent-mark-abort]
Heap
 garbage-first heap   total 20971520K, used 8465150K
[0x0002c000, 0x0002c040a000, 0x0007c000)
  region size 4096K, 2 young (8192K), 0 survivors (0K)
 Metaspace   used 52477K, capacity 52818K, committed 54400K,
reserved 1095680K
  class spaceused 7173K, capacity 7280K, committed 7552K, reserved 1048576K
 Concurrent marking:
  0   init marks: total time = 0.00 s (avg = 0.00 ms).
 13  remarks: total time = 0.49 s (avg =37.77 ms).
   [std. dev =21.54 ms, max =83.71 ms]
13  final marks: total time = 0.03 s (avg = 2.00 ms).
  [std. dev = 2.89 ms, max =11.95 ms]
13weak refs: total time = 0.46 s (avg =35.77 ms).
  [std. dev =22.02 ms, max =82.16 ms]
 13 cleanups: total time = 0.14 s (avg =10.63 ms).
   [std. dev =11.13 ms, max =42.87 ms]
Final counting total time = 0.04 s (avg = 2.96 ms).
RS scrub total time = 0.05 s (avg = 3.91 ms).
  Total stop_world time = 0.63 s.
  Total concurrent time =   195.51 s (  194.00 s marking).


This appears to show that we're seeing allocated heap of 7930 MB out
of 2 MB (so about half).  However, Spark is throwing a Java OOM (out of
heap space) error.  I validated that we're not using legacy memory
management mode.  I've tried this against a few applications (some batch,
some streaming).  Has something changed with memory allocation in Spark
2.3.0 that would cause these issues?

Thank you for any help you can provide.

Regards,

Bryan Jeffrey


Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Thank you. Let me see if I can reproduce this. We're not seeing offsets load 
correctly on startup - but perhaps there is an error on my side.

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 5:01:01 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

Offsets are loaded when you instantiate an
org.apache.kafka.clients.consumer.KafkaConsumer, subscribe, and poll.
There's not an explicit api for it.  Have you looked at the output of
kafka-consumer-groups.sh and tried the example code I linked to?


bash-3.2$ ./bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group commitexample --describe
Note: This will only show information about consumers that use the
Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'commitexample' has no active members.
TOPIC  PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAGCONSUMER-ID
  HOST   CLIENT-ID
test   0  10561656
   600- -


scala> val c = new KafkaConsumer[String, String](kafkaParams.asJava)
c: org.apache.kafka.clients.consumer.KafkaConsumer[String,String] =
org.apache.kafka.clients.consumer.KafkaConsumer@780cbdf8
scala> c.subscribe(java.util.Arrays.asList("test"))
scala> c.poll(0)
scala> c.position(new TopicPartition("test", 0))
res4: Long = 1056






On Thu, Jun 14, 2018 at 3:33 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Where is that called in the driver? The only call I see from Subscribe is to
> load the offset from checkpoint.
>
> Get Outlook for Android
>
> 
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:24:58 PM
>
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The code that loads offsets from kafka is in e.g.
> org.apache.kafka.clients.consumer, it's not in spark.
>
> On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey 
> wrote:
>> Cody,
>>
>> Can you point me to the code that loads offsets? As far as I can see with
>> Spark 2.1, the only offset load is from checkpoint.
>>
>> Thank you!
>>
>> Bryan
>>
>> Get Outlook for Android
>>
>> 
>> From: Cody Koeninger 
>> Sent: Thursday, June 14, 2018 4:00:31 PM
>> To: Bryan Jeffrey
>> Cc: user
>> Subject: Re: Kafka Offset Storage: Fetching Offsets
>>
>> The expectation is that you shouldn't have to manually load offsets
>> from kafka, because the underlying kafka consumer on the driver will
>> start at the offsets associated with the given group id.
>>
>> That's the behavior I see with this example:
>>
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>>
>> What does bin/kafka-consumer-groups.sh show for your group id?
>>
>> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
>> wrote:
>>> Hello.
>>>
>>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>>> on
>>> the documentation
>>>
>>>
>>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>>> it appears that you can now use Kafka itself to store offsets.
>>>
>>> I've setup a  simple Kafka DStream:
>>> val kafkaParameters = Map[String, String](
>>>   "metadata.broker.list" -> brokers,
>>>   "auto.offset.reset" -> "latest",
>>>   "enable.auto.commit" -> false.toString,
>>>   "key.deserializer" ->
>>>
>>>
>>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>>   "partition.assignment.strategy" ->
>>>
>>>
>>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>>   "group.id" -> applicationName
>>> )
>>>
>>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>>> DecodedData](topics.toSeq, kafkaParameters)
>>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>>
>>>
>>> I then commit the offsets:
>>>
>>> var offsets: Array[OffsetRange] = Ar

Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Where is that called in the driver? The only call I see from Subscribe is to 
load the offset from checkpoint.

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 4:24:58 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

The code that loads offsets from kafka is in e.g.
org.apache.kafka.clients.consumer, it's not in spark.

On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Can you point me to the code that loads offsets? As far as I can see with
> Spark 2.1, the only offset load is from checkpoint.
>
> Thank you!
>
> Bryan
>
> Get Outlook for Android
>
> 
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:00:31 PM
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The expectation is that you shouldn't have to manually load offsets
> from kafka, because the underlying kafka consumer on the driver will
> start at the offsets associated with the given group id.
>
> That's the behavior I see with this example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>
> What does bin/kafka-consumer-groups.sh show for your group id?
>
> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
> wrote:
>> Hello.
>>
>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>> on
>> the documentation
>>
>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>> it appears that you can now use Kafka itself to store offsets.
>>
>> I've setup a  simple Kafka DStream:
>> val kafkaParameters = Map[String, String](
>>   "metadata.broker.list" -> brokers,
>>   "auto.offset.reset" -> "latest",
>>   "enable.auto.commit" -> false.toString,
>>   "key.deserializer" ->
>>
>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>   "partition.assignment.strategy" ->
>>
>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>   "group.id" -> applicationName
>> )
>>
>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> DecodedData](topics.toSeq, kafkaParameters)
>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>
>>
>> I then commit the offsets:
>>
>> var offsets: Array[OffsetRange] = Array()
>> stream.foreachRDD(rdd => {
>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>> })
>>
>> // Future: Move this after we've done processing.
>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>
>> The offsets appear to commit successfully. However, on restart the
>> streaming
>> application consistently starts from latest whenever the Spark checkpoint
>> is
>> changed.  Drilling into the code it does not appear that re-loading offset
>> data is supported in the Spark Streaming Kafka library.  How is this
>> expected to work?  Is there an example of saving the offsets to Kafka and
>> then loading them from Kafka?
>>
>> Regards,
>>
>> Bryan Jeffrey


Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Can you point me to the code that loads offsets? As far as I can see with Spark 
2.1, the only offset load is from checkpoint.

Thank you!

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 4:00:31 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

The expectation is that you shouldn't have to manually load offsets
from kafka, because the underlying kafka consumer on the driver will
start at the offsets associated with the given group id.

That's the behavior I see with this example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala

What does bin/kafka-consumer-groups.sh show for your group id?

On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey  wrote:
> Hello.
>
> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based on
> the documentation
> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
> it appears that you can now use Kafka itself to store offsets.
>
> I've setup a  simple Kafka DStream:
> val kafkaParameters = Map[String, String](
>   "metadata.broker.list" -> brokers,
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> false.toString,
>   "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>   "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>   "bootstrap.servers" -> brokersToUse.mkString(","),
>   "group.id" -> applicationName
> )
>
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
> KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>
>
> I then commit the offsets:
>
> var offsets: Array[OffsetRange] = Array()
> stream.foreachRDD(rdd => {
>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   logger.info(s"Offsets: ${offsets.mkString("|")}")
> })
>
> // Future: Move this after we've done processing.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>
> The offsets appear to commit successfully. However, on restart the streaming
> application consistently starts from latest whenever the Spark checkpoint is
> changed.  Drilling into the code it does not appear that re-loading offset
> data is supported in the Spark Streaming Kafka library.  How is this
> expected to work?  Is there an example of saving the offsets to Kafka and
> then loading them from Kafka?
>
> Regards,
>
> Bryan Jeffrey


Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Hello.

I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
on the documentation (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
it appears that you can now use Kafka itself to store offsets.

I've setup a  simple Kafka DStream:
val kafkaParameters = Map[String, String](
  "metadata.broker.list" -> brokers,
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> false.toString,
  "key.deserializer" ->
classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
  "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
  "partition.assignment.strategy" ->
classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
  "bootstrap.servers" -> brokersToUse.mkString(","),
  "group.id" -> applicationName
)

val consumerStrategy = ConsumerStrategies.Subscribe[String,
DecodedData](topics.toSeq, kafkaParameters)
KafkaUtils.createDirectStream(ssc, locationStrategy =
LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)


I then commit the offsets:

var offsets: Array[OffsetRange] = Array()
stream.foreachRDD(rdd => {
  offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  logger.info(s"Offsets: ${offsets.mkString("|")}")
})

// Future: Move this after we've done processing.
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)

The offsets appear to commit successfully. However, on restart the
streaming application consistently starts from latest whenever the Spark
checkpoint is changed.  Drilling into the code it does not appear that
re-loading offset data is supported in the Spark Streaming Kafka library.
How is this expected to work?  Is there an example of saving the offsets to
Kafka and then loading them from Kafka?

Regards,

Bryan Jeffrey


Re: Pandas UDF for PySpark error. Big Dataset

2018-05-29 Thread Bryan Cutler
Can you share some of the code used, or at least the pandas_udf plus the
stacktrace?  Also does decreasing your dataset size fix the oom?

On Mon, May 28, 2018, 4:22 PM Traku traku  wrote:

> Hi.
>
> I'm trying to use the new feature but I can't use it with a big dataset
> (about 5 million rows).
>
> I tried  increasing executor memory, driver memory, partition number, but
> any solution can help me to solve the problem.
>
> One of the executor task increase the shufle memory until fails.
>
> Error is arrow generated: unable to expand the buffer.
>
> Any idea?
>


Re: OneHotEncoderEstimator - java.lang.NoSuchMethodError: org.apache.spark.sql.Dataset.withColumns

2018-05-18 Thread Bryan Cutler
The example works for me, please check your environment and ensure you are
using Spark 2.3.0 where OneHotEncoderEstimator was introduced.

On Fri, May 18, 2018 at 12:57 AM, Matteo Cossu  wrote:

> Hi,
>
> are you sure Dataset has a method withColumns?
>
> On 15 May 2018 at 16:58, Mina Aslani  wrote:
>
>> Hi,
>>
>> I get below error when I try to run oneHotEncoderEstimator example.
>> https://github.com/apache/spark/blob/b74366481cc87490adf4e69
>> d26389ec737548c15/examples/src/main/java/org/apache/
>> spark/examples/ml/JavaOneHotEncoderEstimatorExample.java#L67
>>
>> Which is this line of the code:
>> https://github.com/apache/spark/blob/master/mllib/src/main/
>> scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala#L348
>>
>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: 
>> org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
>>  at 
>> org.apache.spark.ml.feature.OneHotEncoderModel.transform(OneHotEncoderEstimator.scala:348)
>>
>>
>> Can you please let me know, what is the cause? Any workaround?
>>
>> Seeing the example in the repo, looks like that at some point it used be
>> running fine. And, now it's not working. Also, oneHotEncoder is deprecated.
>>
>> I really appreciate your quick response.
>>
>> Regards,
>> Mina
>>
>>
>


Re: How to use StringIndexer for multiple input /output columns in Spark Java

2018-05-16 Thread Bryan Cutler
Yes, the workaround is to create multiple StringIndexers as you described.
OneHotEncoderEstimator is only in Spark 2.3.0, you will have to use just
OneHotEncoder.

On Tue, May 15, 2018, 8:40 AM Mina Aslani  wrote:

> Hi,
>
> So, what is the workaround? Should I create multiple indexer(one for each
> column), and then create pipeline and set stages to have all the
> StringIndexers?
> I am using 2.2.1 as I cannot move to 2.3.0. Looks like
> oneHotEncoderEstimator is broken, please see my email sent today with
> subject:
> OneHotEncoderEstimator - java.lang.NoSuchMethodError: org.apache.spark.sql
> .Dataset.withColumns
>
> Regards,
> Mina
>
> On Tue, May 15, 2018 at 2:37 AM, Nick Pentreath 
> wrote:
>
>> Multi column support for StringIndexer didn’t make it into Spark 2.3.0
>>
>> The PR is still in progress I think - should be available in 2.4.0
>>
>> On Mon, 14 May 2018 at 22:32, Mina Aslani  wrote:
>>
>>> Please take a look at the api doc:
>>> https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
>>>
>>> On Mon, May 14, 2018 at 4:30 PM, Mina Aslani 
>>> wrote:
>>>
 Hi,

 There is no SetInputCols/SetOutputCols for StringIndexer in Spark java.
 How multiple input/output columns can be specified then?

 Regards,
 Mina

>>>
>>>
>


Re: [Arrow][Dremio]

2018-05-15 Thread Bryan Cutler
Hi Xavier,

Regarding Arrow usage in Spark, using Arrow format to transfer data between
Python and Java has been the focus so far because this area stood to
benefit the most.  It's possible that the scope of Arrow could broaden in
the future, but there still needs to be discussions about this.

Bryan

On Mon, May 14, 2018 at 9:55 AM, Pierce Lamb <richard.pierce.l...@gmail.com>
wrote:

> Hi Xavier,
>
> Along the lines of connecting to multiple sources of data and replacing
> ETL tools you may want to check out Confluent's blog on building a
> real-time streaming ETL pipeline on Kafka
> <https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/>
> as well as SnappyData's blog on Real-Time Streaming ETL with SnappyData
> <http://www.snappydata.io/blog/real-time-streaming-etl-with-snappydata> where
> Spark is central to connecting to multiple data sources, executing SQL on
> streams etc. These should provide nice comparisons to your ideas about
> Dremio + Spark as ETL tools.
>
> Disclaimer: I am a SnappyData employee
>
> Hope this helps,
>
> Pierce
>
> On Mon, May 14, 2018 at 2:24 AM, xmehaut <xavier.meh...@gmail.com> wrote:
>
>> Hi Michaël,
>>
>> I'm not an expert of Dremio, i just try to evaluate the potential of this
>> techno and what impacts it could have on spark, and how they can work
>> together, or how spark could use even further arrow internally along the
>> existing algorithms.
>>
>> Dremio has already a quite rich api set enabling to access for instance to
>> metadata, sql queries, or even to create virtual datasets
>> programmatically.
>> They also have a lot of predefined functions, and I imagine there will be
>> more an more fucntions in the future, eg machine learning functions like
>> the
>> ones we may find in azure sql server which enables to mix sql and ml
>> functions.  Acces to dremio is made through jdbc, and we may imagine to
>> access virtual datasets through spark and create dynamically new datasets
>> from the api connected to parquets files stored dynamycally by spark on
>> hdfs, azure datalake or s3... Of course a more thight integration between
>> both should be better with a spark read/write connector to dremio :)
>>
>> regards
>> xavier
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Spark dataset to byte array over grpc

2018-04-23 Thread Bryan Cutler
Hi Ashwin,

This sounds like it might be a good use for Apache Arrow, if you are open
to the type of format to exchange.  As of Spark 2.3, Dataset has a method
"toArrowPayload" that will convert a Dataset of Rows to a byte array in
Arrow format, although the API is currently not public.  Your client could
consume Arrow data directly or perhaps use spark.sql ColumnarBatch to read
back as Rows.

Bryan

On Mon, Apr 23, 2018 at 11:49 AM, Ashwin Sai Shankar <
ashan...@netflix.com.invalid> wrote:

> Hi!
> I'm building a spark app which runs a spark-sql query and send results to
> client over grpc(my proto file is configured to send the sql output as
> "bytes"). The client then displays the output rows. When I run spark.sql, I
> get a DataSet. How do I convert this to byte array?
> Also is there a better way to send this output to client?
>
> Thanks,
> Ashwin
>
>


Re: PySpark ML: Get best set of parameters from TrainValidationSplit

2018-04-16 Thread Bryan Cutler
Hi Aakash,

First you will want to get the the random forest model stage from the best
pipeline model result, for example if RF is the first stage:

rfModel = model.bestModel.stages[0]

Then you can check the values of the params you tuned like this:

rfModel.getNumTrees

On Mon, Apr 16, 2018 at 7:52 AM, Aakash Basu 
wrote:

> Hi,
>
> I am running a Random Forest model on a dataset using hyper parameter
> tuning with Spark's paramGrid and Train Validation Split.
>
> Can anyone tell me how to get the best set for all the four parameters?
>
> I used:
>
> model.bestModel()
> model.metrics()
>
>
> But none of them seem to work.
>
>
> Below is the code chunk:
>
> paramGrid = ParamGridBuilder() \
> .addGrid(rf.numTrees, [50, 100, 150, 200]) \
> .addGrid(rf.maxDepth, [5, 10, 15, 20]) \
> .addGrid(rf.minInfoGain, [0.001, 0.01, 0.1, 0.6]) \
> .addGrid(rf.minInstancesPerNode, [5, 15, 30, 50, 100]) \
> .build()
>
> tvs = TrainValidationSplit(estimator=pipeline,
>estimatorParamMaps=paramGrid,
>evaluator=MulticlassClassificationEvaluator(),
># 80% of the data will be used for training, 20% 
> for validation.
>trainRatio=0.8)
>
> model = tvs.fit(trainingData)
>
> predictions = model.transform(testData)
>
> evaluator = MulticlassClassificationEvaluator(
> labelCol="label", predictionCol="prediction", metricName="accuracy")
> accuracy = evaluator.evaluate(predictions)
> print("Accuracy = %g" % accuracy)
> print("Test Error = %g" % (1.0 - accuracy))
>
>
> Any help?
>
>
> Thanks,
> Aakash.
>


Re: [Spark 2.x Core] Adding to ArrayList inside rdd.foreach()

2018-04-07 Thread Bryan Jeffrey
You can just call rdd.flatMap(_._2).collect


Get Outlook for Android


From: klrmowse 
Sent: Saturday, April 7, 2018 1:29:34 PM
To: user@spark.apache.org
Subject: Re: [Spark 2.x Core] Adding to ArrayList inside rdd.foreach()

okie, well...

i'm working with a pair rdd 

i need to extract the values and store them somehow (maybe a simple
Array??), which i later parallelize and reuse

since adding to a list is a no-no, what, if any, are the other options?
(Java Spark, btw)



thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: is there a way of register python UDF using java API?

2018-04-02 Thread Bryan Cutler
Hi Kant,

The udfDeterministic would be set to false if the results from your UDF are
non-deterministic, such as produced by random numbers, so the catalyst
optimizer will not cache and reuse results.

On Mon, Apr 2, 2018 at 12:11 PM, kant kodali  wrote:

> Looks like there is spark.udf().registerPython() like below.
>
> public void registerPython(java.lang.String name, org.apache.spark.sql.
> execution.python.UserDefinedPythonFunction udf)
>
>
> can anyone describe what *udfDeterministic *parameter does in the method
> signature below?
>
> public UserDefinedPythonFunction(java.lang.String name, 
> org.apache.spark.api.python.PythonFunction func, 
> org.apache.spark.sql.types.DataType dataType, int pythonEvalType, boolean 
> udfDeterministic) { /* compiled code */ }
>
>
> On Sun, Apr 1, 2018 at 3:46 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> All of our spark code is in Java wondering if there a way to register
>> python UDF's using java API such that the registered UDF's can be used
>> using raw spark SQL.
>> If there is any other way to achieve this goal please suggest!
>>
>> Thanks
>>
>>
>


Re: Return statements aren't allowed in Spark closures

2018-02-21 Thread Bryan Jeffrey
Lian,

You're writing Scala. Just remove the 'return'. No need for it in Scala.

Get Outlook for Android


From: Lian Jiang 
Sent: Wednesday, February 21, 2018 4:16:08 PM
To: user
Subject: Return statements aren't allowed in Spark closures

I can run below code in spark-shell using yarn client mode.


val csv = spark.read.option("header", "true").csv("my.csv")


def queryYahoo(row: Row) : Int = { return 10; }


csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => { queryYahoo(r) })}

However, the same code failed when run using spark-submit in yarn client or 
cluster mode due to error:


18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception: 
org.apache.spark.util.ReturnStatementInClosureException: Return statements 
aren't allowed in Spark closures

org.apache.spark.util.ReturnStatementInClosureException: Return statements 
aren't allowed in Spark closures

at 
org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:371)

at org.apache.xbean.asm5.ClassReader.a(Unknown Source)

at org.apache.xbean.asm5.ClassReader.b(Unknown Source)

at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)

at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)

at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:243)

at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)

at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)

at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)

at scala.collection.immutable.List.foreach(List.scala:381)

at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:292)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)


Any idea? Thanks.


Re: PySpark Tweedie GLM

2018-02-09 Thread Bryan Cutler
Can you provide some code/data to reproduce the problem?

On Fri, Feb 9, 2018 at 9:42 AM, nhamwey 
wrote:

> I am using Spark 2.2.0 through Python.
>
> I am repeatedly getting a zero weight of sums error when trying to run a
> model. This happens even when I do not specify a defined weightCol =
> "variable"
>
> Py4JJavaError: An error occurred while calling o1295.fit.
> : java.lang.AssertionError: assertion failed: Sum of weights cannot be
> zero.
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.ml.optim.WeightedLeastSquares$Aggregator.validate(
> WeightedLeastSquares.scala:418)
> at
> org.apache.spark.ml.optim.WeightedLeastSquares.fit(
> WeightedLeastSquares.scala:101)
> at
> org.apache.spark.ml.optim.IterativelyReweightedLeastSquares.fit(
> IterativelyReweightedLeastSquares.scala:86)
> at
> org.apache.spark.ml.regression.GeneralizedLinearRegression.train(
> GeneralizedLinearRegression.scala:369)
> at
> org.apache.spark.ml.regression.GeneralizedLinearRegression.train(
> GeneralizedLinearRegression.scala:203)
> at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(
> ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
> java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-02-08 Thread Bryan Cutler
Nicolas, are you referring to printing the model params in that example
with "print(model1.extractParamMap())"?  There was a problem with pyspark
models not having params after being fit, which causes this example to show
nothing for model paramMaps.  This was fixed in
https://issues.apache.org/jira/browse/SPARK-10931 and the example now shows
all model params.  The fix will be in the Spark 2.3 release.

Bryan

On Wed, Jan 31, 2018 at 10:20 PM, Nicolas Paris <nipari...@gmail.com> wrote:

> Hey
>
> I am also interested in how to get those parameters.
> For example, the demo code spark-2.2.1-bin-hadoop2.7/
> examples/src/main/python/ml/estimator_transformer_param_example.py
> return empty parameters when  printing "lr.extractParamMap()"
>
> That's weird
>
> Thanks
>
> Le 30 janv. 2018 à 23:10, Bryan Cutler écrivait :
> > Hi Michelle,
> >
> > Your original usage of ParamGridBuilder was not quite right, `addGrid`
> expects
> > (some parameter, array of values for that parameter).  If you want to do
> a grid
> > search with different regularization values, you would do the following:
> >
> > val paramMaps = new ParamGridBuilder().addGrid(logist.regParam,
> Array(0.1,
> > 0.3)).build()
> >
> > * don't forget to build the grid after adding values
> >
> > On Tue, Jan 30, 2018 at 6:55 AM, michelleyang <
> michelle1026sh...@gmail.com>
> > wrote:
> >
> > I tried to use One vs Rest in spark ml with pipeline and
> crossValidator for
> > multimultinomial in logistic regression.
> >
> > It came out with empty coefficients. I figured out it was the
> setting of
> > ParamGridBuilder. Can anyone help me understand how does the
> parameter
> > setting affect the crossValidator process?
> >
> > the orginal code: //output empty coefficients.
> >
> > val logist=new LogisticRegression
> >
> > val ova = new OneVsRest().setClassifier(logist)
> >
> > val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> > Array(logist.getRegParam))
> >
> > New code://output multi classes coefficients
> >
> > val logist=new LogisticRegression
> >
> > val ova = new OneVsRest().setClassifier(logist)
> >
> > val classifier1 = new LogisticRegression().setRegParam(2.0)
> >
> > val classifier2 = new LogisticRegression().setRegParam(3.0)
> >
> > val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> > Array(classifier1, classifier2))
> >
> > Please help Thanks.
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > 
> -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
>


Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-01-30 Thread Bryan Cutler
Hi Michelle,

Your original usage of ParamGridBuilder was not quite right, `addGrid`
expects (some parameter, array of values for that parameter).  If you want
to do a grid search with different regularization values, you would do the
following:

val paramMaps = new ParamGridBuilder().addGrid(logist.regParam, Array(0.1,
0.3)).build()

* don't forget to build the grid after adding values

On Tue, Jan 30, 2018 at 6:55 AM, michelleyang 
wrote:

> I tried to use One vs Rest in spark ml with pipeline and crossValidator for
> multimultinomial in logistic regression.
>
> It came out with empty coefficients. I figured out it was the setting of
> ParamGridBuilder. Can anyone help me understand how does the parameter
> setting affect the crossValidator process?
>
> the orginal code: //output empty coefficients.
>
> val logist=new LogisticRegression
>
> val ova = new OneVsRest().setClassifier(logist)
>
> val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> Array(logist.getRegParam))
>
> New code://output multi classes coefficients
>
> val logist=new LogisticRegression
>
> val ova = new OneVsRest().setClassifier(logist)
>
> val classifier1 = new LogisticRegression().setRegParam(2.0)
>
> val classifier2 = new LogisticRegression().setRegParam(3.0)
>
> val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> Array(classifier1, classifier2))
>
> Please help Thanks.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Timestamp changing while writing

2018-01-15 Thread Bryan Cutler
Spark internally stores timestamps as UTC values, so cearteDataFrame will
covert from local time zone to UTC. I think there was a Jira to correct
parquet output. Are the values you are seeing offset from your local time
zone?

On Jan 11, 2018 4:49 PM, "sk skk"  wrote:

> Hello,
>
> I am using createDataframe and passing java row rdd and schema . But it is
> changing the time value when I write that data frame to a parquet file.
>
> Can any one help .
>
> Thank you,
> Sudhir
>


Stopping a Spark Streaming Context gracefully

2017-11-07 Thread Bryan Jeffrey
Hello.

I am running Spark 2.1, Scala 2.11.  We're running several Spark streaming
jobs.  In some cases we restart these jobs on an occasional basis.  We have
code that looks like the following:

logger.info("Starting the streaming context!")
ssc.start()
logger.info("Waiting for termination!")
Option(config.getInt(Parameters.RestartMinutes)).getOrElse(0) match {
  case restartMinutes: Int if restartMinutes > 0 =>
logger.info(s"Waiting for ${restartMinutes} before terminating job")
ssc.awaitTerminationOrTimeout(restartMinutes *
DateUtils.millisecondsPerMinute)
  case _ => ssc.awaitTermination()
}
logger.info("Calling 'stop'")
ssc.stop(stopSparkContext = true, stopGracefully = true)


In several cases we've observed jobs where we've called 'stop' not
stopping.  I went and wrote a simple job that reads from Kafka and does
nothing (prints a count of data).  After several minutes it simply calls
'ssc.stop(true, true)'.  In some cases this will stop the context.  In
others it will not stop the context.  If we call 'stop' several times over
an interval one of them eventually succeeds.

It looks like this is a bug.  I looked in Jira and did not see an open
issue.  Is this a  known problem?  If not I'll open a bug.

Regards,

Bryan Jeffrey


Re: Apache Spark: Parallelization of Multiple Machine Learning ALgorithm

2017-09-05 Thread Bryan Cutler
Hi Prem,

Spark actually does somewhat support different algorithms in
CrossValidator, but it's not really obvious.  You basically need to make a
Pipeline and build a ParamGrid with different algorithms as stages.  Here
is an simple example:

val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")

val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")

val pipeline = new Pipeline()

val paramGrid = new ParamGridBuilder()
  .addGrid(pipeline.stages, Array(Array[PipelineStage](dt),
Array[PipelineStage](lr)))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)

Although adding more params in the grid can get a little complicated - I
discuss in detail here https://bryancutler.github.io/cv-pipelines/
As Patrick McCarthy mentioned, you might want to follow SPARK-19071 ,
specifically https://issues.apache.org/jira/browse/SPARK-19357 which
parallelizes model evaluation.

Bryan

On Tue, Sep 5, 2017 at 8:02 AM, Yanbo Liang <yblia...@gmail.com> wrote:

> You are right, native Spark MLlib CrossValidation can't run *different 
> *algorithms
> in parallel.
>
> Thanks
> Yanbo
>
> On Tue, Sep 5, 2017 at 10:56 PM, Timsina, Prem <prem.tims...@mssm.edu>
> wrote:
>
>> Hi Yanboo,
>>
>> Thank You, I very much appreciate your help.
>>
>> For the current use case, the data can fit into a single node. So,
>> spark-sklearn seems to be good choice.
>>
>>
>>
>> *I have  on question regarding this *
>>
>> *“If no, Spark MLlib provide CrossValidation which can run multiple
>> machine learning algorithms parallel on distributed dataset and do
>> parameter search.
>> FYI: https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_latest_ml-2Dtuning.html-23cross-2Dvalidation=DwMFaQ=shNJtf5dKgNcPZ6Yh64b-A=wnzquyZN5LCZ2v6jPXe4F2nU9j4v9g_t24s63U3cYqE=FtsbdcfaOELxFW8EFphZgjTd7cl3Kc5oYsQ558EZb3A=lVvXRRGoh5uXJw-K246dNzogKEfb2yFYtxpTB9xxizo=>”*
>>
>> If I understand correctly, it can run parameter search for
>> cross-validation in parallel.
>>
>> However,  currently  Spark does not support  running multiple algorithms
>> (like Naïve Bayes,  Random Forest, etc.) in parallel. Am I correct?
>>
>> If not, could you please point me to some resources where they have run
>> multiple algorithms in parallel.
>>
>>
>>
>> Thank You very much. It is great help, I will try spark-sklearn.
>>
>> Prem
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From: *Yanbo Liang <yblia...@gmail.com>
>> *Date: *Tuesday, September 5, 2017 at 10:40 AM
>> *To: *Patrick McCarthy <pmccar...@dstillery.com>
>> *Cc: *"Timsina, Prem" <prem.tims...@mssm.edu>, "user@spark.apache.org" <
>> user@spark.apache.org>
>> *Subject: *Re: Apache Spark: Parallelization of Multiple Machine
>> Learning ALgorithm
>>
>>
>>
>> Hi Prem,
>>
>>
>>
>> How large is your dataset? Can it be fitted in a single node?
>>
>> If no, Spark MLlib provide CrossValidation which can run multiple machine
>> learning algorithms parallel on distributed dataset and do parameter
>> search. FYI: https://spark.apache.org/docs/latest/ml-tuning.html#cro
>> ss-validation
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_latest_ml-2Dtuning.html-23cross-2Dvalidation=DwMFaQ=shNJtf5dKgNcPZ6Yh64b-A=wnzquyZN5LCZ2v6jPXe4F2nU9j4v9g_t24s63U3cYqE=FtsbdcfaOELxFW8EFphZgjTd7cl3Kc5oYsQ558EZb3A=lVvXRRGoh5uXJw-K246dNzogKEfb2yFYtxpTB9xxizo=>
>>
>> If yes, you can also try spark-sklearn, which can distribute multiple
>> model training(single node training with sklearn) across a distributed
>> cluster and do parameter search. FYI: https://github.com/databr
>> icks/spark-sklearn
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_databricks_spark-2Dsklearn=DwMFaQ=shNJtf5dKgNcPZ6Yh64b-A=wnzquyZN5LCZ2v6jPXe4F2nU9j4v9g_t24s63U3cYqE=FtsbdcfaOELxFW8EFphZgjTd7cl3Kc5oYsQ558EZb3A=JfciAow01oTIYYCjhy83Q_nF85fKW9ZI-qYxfUa0BUU=>
>>
>>
>>
>> Thanks
>>
>> Yanbo
>>
>>
>>
>> On Tue, Sep 5, 2017 at 9:56 PM, Patrick McCarthy <pmccar...@dstillery.com>
>> wrote:
>>
>> You might benefit from watching this JIRA issue -
>> https://issues.apache.org/jira/browse/SPARK-19071
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D19071=DwMFaQ=shNJtf5dKgNcPZ6Y

Re: about broadcast join of base table in spark sql

2017-06-30 Thread Bryan Jeffrey
Hello. 




If you want to allow broadcast join with larger broadcasts you can set 
spark.sql.autoBroadcastJoinThreshold to a higher value. This will cause the 
plan to allow join despite 'A' being larger than the default threshold. 




Get Outlook for Android







From: paleyl


Sent: Wednesday, June 28, 10:42 PM


Subject: about broadcast join of base table in spark sql


To: d...@spark.org, user@spark.apache.org






Hi All,






Recently I meet a problem in broadcast join: I want to left join table A and B, 
A is the smaller one and the left table, so I wrote 




A = A.join(B,A("key1") === B("key2"),"left")




but I found that A is not broadcast out, as the shuffle size is still very 
large.




I guess this is a designed mechanism in spark, so could anyone please tell me 
why it is designed like this? I am just very curious.






Best,






Paley 










Re: Question about Parallel Stages in Spark

2017-06-27 Thread Bryan Jeffrey
Satish, 




Is this two separate applications submitted to the Yarn scheduler? If so then 
you would expect that you would see the original case run in parallel. 




However, if this is one application your submission to Yarn guarantees that 
this application will fairly  contend with resources requested by other 
applications. However, the internal output operations within your application 
(jobs) will be scheduled by the driver (running on a single AM). This means 
that whatever driver options and code you've set will impact the application, 
but the Yarn scheduler will not impact (beyond allocating cores, memory, etc. 
between applications.)








Get Outlook for Android







On Tue, Jun 27, 2017 at 2:33 AM -0400, "satish lalam" <satish.la...@gmail.com> 
wrote:










Thanks All. To reiterate - stages inside a job can be run parallely as long as 
- (a) there is no sequential dependency (b) the job has sufficient resources. 
however, my code was launching 2 jobs and they are sequential as you rightly 
pointed out.The issue which I was trying to highlight with that piece of 
pseudocode however was that - I am observing a job with 2 stages which dont 
depend on each other (they both are reading data from 2 seperate tables in db), 
they both are scheduled and both stages get resources - but the 2nd stage 
really does not pick up until the 1st stage is complete. It might be due to the 
db driver - I will post it to the right forum. Thanks.
On Mon, Jun 26, 2017 at 9:12 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote:
i think my words also misunderstood. My point is they will not submit together 
since they are the part of one thread.  
val spark =  SparkSession.builder()
  .appName("practice")
  .config("spark.scheduler.mode","FAIR")
  .enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
Thread.sleep(1000)
I ran this and both spark submit time are different for both the jobs .
Please let me if I am wrong
On Tue, Jun 27, 2017 at 9:17 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
My words cause misunderstanding.Step 1:A is submited to spark.Step 2:B is 
submitted to spark.
Spark gets two independent jobs.The FAIR  is used to schedule A and B.
Jeffrey' code did not cause two submit.


 ---Original---From: "Pralabh Kumar"<pralabhku...@gmail.com>Date: 2017/6/27 
12:09:27To: "萝卜丝炒饭"<1427357...@qq.com>;Cc: 
"user"<user@spark.apache.org>;"satishl"<satish.la...@gmail.com>;"Bryan 
Jeffrey"<bryan.jeff...@gmail.com>;Subject: Re: Question about Parallel Stages 
in Spark
Hi 
I don't think so spark submit ,will receive two submits .  Its will execute one 
submit and then to next one .  If the application is multithreaded ,and two 
threads are calling spark submit and one time , then they will run parallel 
provided the scheduler is FAIR and task slots are available . 
But in one thread ,one submit will complete and then the another one will start 
. If there are independent stages in one job, then those will run parallel.
I agree with Bryan Jeffrey .

RegardsPralabh Kumar
On Tue, Jun 27, 2017 at 9:03 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
I think the spark cluster receives two submits, A and B.The FAIR  is used to 
schedule A and B.I am not sure about this.
 ---Original---From: "Bryan Jeffrey"<bryan.jeff...@gmail.com>Date: 2017/6/27 
08:55:42To: "satishl"<satish.la...@gmail.com>;Cc: 
"user"<user@spark.apache.org>;Subject: Re: Question about Parallel Stages in 
Spark
Hello.
The driver is running the individual operations in series, but each operation 
is parallelized internally.  If you want them run in parallel you need to 
provide the driver a mechanism to thread the job scheduling out:
val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(1 to 20)

var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par

thingsToDo.foreach { case(rdd, index) =>
  for(i <- (1 to 1))
logger.info(s"Index ${index} - ${rdd.sum()}")
}
This will run both operations in parallel.

On Mon, Jun 26, 2017 at 8:10 PM, satishl <satish.la...@gmail.com> wrote:
For the below code, since rdd1 and rdd2 dont depend on each other - i was

expecting that both first and second printlns would be interwoven. However -

the spark job runs all "first " statements first and then all "seocnd"

statements next in serial fashion. I have set spark.scheduler.mode = FAIR.

obviously my understanding of parallel stages is wrong. What am I missing?



    val rdd1 = sc.parallelize(1 to 100)

    val rdd2 = sc.parallelize(1 to 100)



    for (i <- (1 to 100))

      println("first: " + rdd1.sum())

    for 

Re: Question about Parallel Stages in Spark

2017-06-26 Thread Bryan Jeffrey
Hello.

The driver is running the individual operations in series, but each
operation is parallelized internally.  If you want them run in parallel you
need to provide the driver a mechanism to thread the job scheduling out:

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(1 to 20)

var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par

thingsToDo.foreach { case(rdd, index) =>
  for(i <- (1 to 1))
logger.info(s"Index ${index} - ${rdd.sum()}")
}


This will run both operations in parallel.


On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:

> For the below code, since rdd1 and rdd2 dont depend on each other - i was
> expecting that both first and second printlns would be interwoven. However
> -
> the spark job runs all "first " statements first and then all "seocnd"
> statements next in serial fashion. I have set spark.scheduler.mode = FAIR.
> obviously my understanding of parallel stages is wrong. What am I missing?
>
> val rdd1 = sc.parallelize(1 to 100)
> val rdd2 = sc.parallelize(1 to 100)
>
> for (i <- (1 to 100))
>   println("first: " + rdd1.sum())
> for (i <- (1 to 100))
>   println("second" + rdd2.sum())
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spark-tp28793.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Meetup in Taiwan

2017-06-25 Thread Yang Bryan
Hi,

I'm Bryan, the co-founder of Taiwan Spark User Group.
We discuss, share information on https://www.facebook.com/groups/spark.tw/.
We have physical meetup twice a month.
Please help us add on the official website.

And We will hold a code competition about Spark, could we print the logo of
Spark on the certificate of participation?

If you have any questions or suggestions, please feel free and let me know.
Thank you.

Best Regards,
Bryan


Re: Broadcasts & Storage Memory

2017-06-21 Thread Bryan Jeffrey
Satish, 




I agree - that was my impression too. However I am seeing a smaller set of 
storage memory used on a given executor than the amount of memory required for 
my broadcast variables. I am wondering if the statistics in the ui are 
incorrect or if the broadcasts are simply not a part of that storage memory 
fraction. 




Bryan Jeffrey 




Get Outlook for Android







On Wed, Jun 21, 2017 at 6:48 PM -0400, "satish lalam" <satish.la...@gmail.com> 
wrote:










My understanding is - it from storageFraction. Here cached blocks are immune to 
eviction - so both persisted RDDs and broadcast variables sit here. Ref 
On Wed, Jun 21, 2017 at 1:43 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
Hello.
Question: Do broadcast variables stored on executors count as part of 'storage 
memory' or other memory?
A little bit more detail:
I understand that we have two knobs to control memory allocation:- 
spark.memory.fraction- spark.memory.storageFraction
My understanding is that spark.memory.storageFraction controls the amount of 
memory allocated for cached RDDs.  spark.memory.fraction controls how much 
memory is allocated to Spark operations (task serialization, operations, etc.), 
w/ the remainder reserved for user data structures, Spark internal metadata, 
etc.  This includes the storage memory for cached RDDs.

You end up with executor memory that looks like the following:All memory: 
0-100Spark memory: 0-75RDD Storage: 0-37Other Spark: 38-75Other Reserved: 76-100
Where do broadcast variables fall into the mix?
Regards,
Bryan Jeffrey









Broadcasts & Storage Memory

2017-06-21 Thread Bryan Jeffrey
Hello.

Question: Do broadcast variables stored on executors count as part of
'storage memory' or other memory?

A little bit more detail:

I understand that we have two knobs to control memory allocation:
- spark.memory.fraction
- spark.memory.storageFraction

My understanding is that spark.memory.storageFraction controls the amount
of memory allocated for cached RDDs.  spark.memory.fraction controls how
much memory is allocated to Spark operations (task serialization,
operations, etc.), w/ the remainder reserved for user data structures,
Spark internal metadata, etc.  This includes the storage memory for cached
RDDs.

You end up with executor memory that looks like the following:
All memory: 0-100
Spark memory: 0-75
RDD Storage: 0-37
Other Spark: 38-75
Other Reserved: 76-100

Where do broadcast variables fall into the mix?

Regards,

Bryan Jeffrey


Re: how many topics spark streaming can handle

2017-06-19 Thread Bryan Jeffrey
Hello Ashok, 




We're consuming from more than 10 topics in some Spark streaming applications. 
Topic management is a concern (what is read from where, etc), but I have seen 
no issues from Spark itself. 




Regards, 




Bryan Jeffrey 




Get Outlook for Android







On Mon, Jun 19, 2017 at 3:24 PM -0400, "Ashok Kumar" 
<ashok34...@yahoo.com.invalid> wrote:










thank you
in the following example
   val topics = "test1,test2,test3"
    val brokers = "localhost:9092"
    val topicsSet = topics.split(",").toSet
    val sparkConf = new 
SparkConf().setAppName("KafkaDroneCalc").setMaster("local") 
//spark://localhost:7077
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(30))
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder] (ssc, kafkaParams, topicsSet)
  it is possible to have three topics or many topics?

 

On Monday, 19 June 2017, 20:10, Michael Armbrust <mich...@databricks.com> 
wrote:
  

 I don't think that there is really a Spark specific limit here.  It would be a 
function of the size of your spark / kafka clusters and the type of processing 
you are trying to do.
On Mon, Jun 19, 2017 at 12:00 PM, Ashok Kumar <ashok34...@yahoo.com.invalid> 
wrote:
  Hi Gurus,
Within one Spark streaming process how many topics can be handled? I have not 
tried more than one topic.
Thanks


 






Re: Scala, Python or Java for Spark programming

2017-06-07 Thread Bryan Jeffrey
Mich,

We use Scala for a large project.  On our team we've set a few standards to
ensure readability (we try to avoid excessive use of tuples, use named
functions, etc.)  Given these constraints, I find Scala to be very
readable, and far easier to use than Java.  The Lambda functionality of
Java provides a lot of similar features, but the amount of typing required
to set down a small function is excessive at best!

Regards,

Bryan Jeffrey

On Wed, Jun 7, 2017 at 12:51 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> I think this is a religious question ;-)
> Java is often underestimated, because people are not aware of its lambda
> functionality which makes the code very readable. Scala - it depends who
> programs it. People coming with the normal Java background write Java-like
> code in scala which might not be so good. People from a functional
> background write it more functional like - i.e. You have a lot of things in
> one line of code which can be a curse even for other functional
> programmers, especially if the application is distributed as in the case of
> Spark. Usually no comment is provided and you have - even as a functional
> programmer - to do a lot of drill down. Python is somehow similar, but
> since it has no connection with Java you do not have these extremes. There
> it depends more on the community (e.g. Medical, financials) and skills of
> people how the code look likes.
> However the difficulty comes with the distributed applications behind
> Spark which may have unforeseen side effects if the users do not know this,
> ie if they have never been used to parallel programming.
>
> On 7. Jun 2017, at 17:20, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>
> Hi,
>
> I am a fan of Scala and functional programming hence I prefer Scala.
>
> I had a discussion with a hardcore Java programmer and a data scientist
> who prefers Python.
>
> Their view is that in a collaborative work using Scala programming it is
> almost impossible to understand someone else's Scala code.
>
> Hence I was wondering how much truth is there in this statement. Given
> that Spark uses Scala as its core development language, what is the general
> view on the use of Scala, Python or Java?
>
> Thanks,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: Is there a way to do conditional group by in spark 2.1.1?

2017-06-03 Thread Bryan Jeffrey
You should be able to project a new column that is your group column. Then you 
can group on the projected column. 




Get Outlook for Android







On Sat, Jun 3, 2017 at 6:26 PM -0400, "upendra 1991" 
 wrote:










Use a function

Sent from Yahoo Mail on Android 
   On Sat, Jun 3, 2017 at 5:01 PM, kant kodali wrote:   Hi 
All,
Is there a way to do conditional group by in spark 2.1.1? other words, I want 
to do something like this

if (field1 == "foo") { 
       df.groupBy(field1)
} else if (field2 == "bar")      df.groupBy(field2)
Thanks
  






Re: Convert camelCase to snake_case when saving Dataframe/Dataset to parquet?

2017-05-22 Thread Bryan Jeffrey
Mike, 




I have code to do that. I'll share it tomorrow. 




Get Outlook for Android







On Mon, May 22, 2017 at 4:53 PM -0400, "Mike Wheeler" 
 wrote:










Hi Spark User,
For Scala case class, we usually use camelCase (carType) for member fields. 
However, many data system use snake_case (car_type) for column names. When 
saving a Dataset of case class to parquet, is there any way to automatically 
convert camelCase to snake_case (carType -> car_type)? 
Thanks,
Mike









Re: Crossvalidator after fit

2017-05-05 Thread Bryan Cutler
Looks like there might be a problem with the way you specified your
parameter values, probably you have an integer value where it should be a
floating-point.  Double check that and if there is still a problem please
share the rest of your code so we can see how you defined "gridS".

On Fri, May 5, 2017 at 7:40 AM, issues solution 
wrote:

> Hi get the following error after trying to perform
> gridsearch and crossvalidation on randomforst estimator for classificaiton
>
> rf = RandomForestClassifier(labelCol="Labeld",featuresCol="features")
>
> evaluator =  BinaryClassificationEvaluator(metricName="F1 Score")
>
> rf_cv = CrossValidator(estimator=rf, 
> estimatorParamMaps=gridS,evaluator=evaluator,numFolds=5)
> (trainingData, testData) = transformed13.randomSplit([0.7, 0.3])
> rfmodel  =  rf_cv.fit(trainingData)
> ---Py4JJavaError
>  Traceback (most recent call 
> last) in ()> 1 rfmodel  =  
> rf_cv.fit(trainingData)
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params) 67 return 
> self.copy(params)._fit(dataset) 68 else:---> 69   
>   return self._fit(dataset) 70 else: 71 raise 
> ValueError("Params must be either a param map or a list/tuple of param maps, "
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/tuning.py in _fit(self, 
> dataset)237 train = df.filter(~condition)238 
> for j in range(numModels):--> 239 model = est.fit(train, 
> epm[j])240 # TODO: duplicate evaluator to take extra 
> params from input241 metric = 
> eva.evaluate(model.transform(validation, epm[j]))
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params) 65 elif isinstance(params, dict): 
> 66 if params:---> 67 return 
> self.copy(params)._fit(dataset) 68 else: 69   
>   return self._fit(dataset)
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/wrapper.py in 
> _fit(self, dataset)131 132 def _fit(self, dataset):--> 133
>  java_model = self._fit_java(dataset)134 return 
> self._create_model(java_model)135
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/wrapper.py in 
> _fit_java(self, dataset)127 :return: fitted Java model128 
> """--> 129 self._transfer_params_to_java()130 return 
> self._java_obj.fit(dataset._jdf)131
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/wrapper.py in 
> _transfer_params_to_java(self) 80 for param in self.params: 
> 81 if param in paramMap:---> 82 pair = 
> self._make_java_param_pair(param, paramMap[param]) 83 
> self._java_obj.set(pair) 84
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/ml/wrapper.py in 
> _make_java_param_pair(self, param, value) 71 java_param = 
> self._java_obj.getParam(param.name) 72 java_value = _py2java(sc, 
> value)---> 73 return java_param.w(java_value) 74  75 def 
> _transfer_params_to_java(self):
> /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)811 answer = 
> self.gateway_client.send_command(command)812 return_value = 
> get_return_value(--> 813 answer, self.gateway_client, 
> self.target_id, self.name)814 815 for temp_arg in temp_args:
> /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/utils.py in deco(*a, 
> **kw) 43 def deco(*a, **kw): 44 try:---> 45 
> return f(*a, **kw) 46 except py4j.protocol.Py4JJavaError as e:
>  47 s = e.java_exception.toString()
> /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)306  
>raise Py4JJavaError(307 "An error occurred 
> while calling {0}{1}{2}.\n".--> 308 format(target_id, 
> ".", name), value)309 else:310 raise 
> Py4JError(
> Py4JJavaError: An error occurred while calling o91602.w.
> : java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.Double
>   at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
>   at org.apache.spark.ml.param.DoubleParam.w(params.scala:225)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 

Re: pandas DF Dstream to Spark DF

2017-04-10 Thread Bryan Cutler
Hi Yogesh,

It would be easier to help if you included your code and the exact error
messages that occur.  If you are creating a Spark DataFrame with a Pandas
DataFrame, then Spark does not read the schema and infers from the data to
make one.  This might be the cause of your issue if the schema is not
inferred correctly.  You can try to specify the schema manually, like this
for example

schema = StructType([
StructField("str_t", StringType(), True),
StructField("int_t", IntegerType(), True),
StructField("double_t", DoubleType(), True)])

pandas_df = pandas.DataFrame(data={...})
spark_df = spark.createDataFrame(pandas_df, schema=schema)

This step might be eliminated by using Apache Arrow, see SPARK-13534 for
related work.

On Sun, Apr 9, 2017 at 10:19 PM, Yogesh Vyas  wrote:

> Hi,
>
> I am writing a pyspark streaming job in which i am returning a pandas data
> frame as DStream. Now I wanted to save this DStream dataframe to parquet
> file. How to do that?
>
> I am trying to convert it to spark data frame but I am getting multiple
> errors. Please suggest me how to do that.
>
> Regards,
> Yogesh
>


Re: [RDDs and Dataframes] Equivalent expressions for RDD API

2017-03-04 Thread bryan . jeffrey


Rdd operation:


rdd.map(x => (word, count)).reduceByKey(_+_)






Get Outlook for Android









On Sat, Mar 4, 2017 at 8:59 AM -0500, "Old-School" 
 wrote:










Hi,

I want to perform some simple transformations and check the execution time,
under various configurations (e.g. number of cores being used, number of
partitions etc). Since it is not possible to set the partitions of a
dataframe , I guess that I should probably use RDDs. 

I've got a dataset with 3 columns as shown below:

val data = file.map(line => line.split(" "))
  .filter(lines => lines.length == 3) // ignore first line
  .map(row => (row(0), row(1), row(2)))
  .toDF("ID", "word-ID", "count")
results in:

+--++-+
| ID |  word-ID   |  count   |
+--++-+
|  15   |87  |   151|
|  20   |19  |   398|
|  15   |19  |   21  |
|  180 |90  |   190|
+---+-+
So how can I turn the above into an RDD in order to use e.g.
sc.parallelize(data, 10) and set the number of partitions to say 10? 

Furthermore, I would also like to ask about the equivalent expression (using
RDD API) for the following simple transformation:

data.select("word-ID",
"count").groupBy("word-ID").agg(sum($"count").as("count")).show()



Thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-and-Dataframes-Equivalent-expressions-for-RDD-API-tp28455.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org








Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread bryan . jeffrey


Mohammad, 


We store our offsets in Cassandra,  and use that for tracking. This solved a 
few issues for us,  as it provides a good persistence mechanism even when 
you're reading from multiple clusters. 


Bryan Jeffrey 




Get Outlook for Android









On Tue, Feb 14, 2017 at 7:03 PM -0500, "Mohammad Kargar" <mkar...@phemi.com> 
wrote:










As explained here, direct approach of integration between spark streaming and 
kafka does not update offsets in Zookeeper, hence Zookeeper-based Kafka 
monitoring tools will not show progress (details).  We followed the recommended 
workaround to update the zookeeper with the latest offset after each batch, but 
no success. Wondering if there's any end-to-end example we can use?

Thanks,
Mohammad









Re: How to specify "verbose GC" in Spark submit?

2017-02-06 Thread Bryan Jeffrey
Hello.

When specifying GC options for Spark you must determine where you want the
GC options specified - on the executors or on the driver. When you submit
your job, for the driver, specify '--driver-java-options
"-XX:+PrintFlagsFinal  -verbose:gc", etc.  For the executor specify --conf
"spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal  -verbose:gc", etc.

Bryan Jeffrey

On Mon, Feb 6, 2017 at 8:02 AM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Dear All,
>
> Is there any way to specify verbose GC -i.e. “-verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps” in Spark submit?
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>


Re: Belief propagation algorithm is open sourced

2016-12-14 Thread Bryan Cutler
I'll check it out, thanks for sharing Alexander!

On Dec 13, 2016 4:58 PM, "Ulanov, Alexander" 
wrote:

> Dear Spark developers and users,
>
>
> HPE has open sourced the implementation of the belief propagation (BP)
> algorithm for Apache Spark, a popular message passing algorithm for
> performing inference in probabilistic graphical models. It provides exact
> inference for graphical models without loops. While inference for graphical
> models with loops is approximate, in practice it is shown to work well. The
> implementation is generic and operates on factor graph representation of
> graphical models. It handles factors of any order, and variable domains of
> any size. It is implemented with Apache Spark GraphX, and thus can scale to
> large scale models. Further, it supports computations in log scale for
> numerical stability. Large scale applications of BP include fraud detection
> in banking transactions and malicious site detection in computer networks.
>
>
> Source code: https://github.com/HewlettPackard/sandpiper
>
>
> Best regards, Alexander
>


Re: Streaming Batch Oddities

2016-12-13 Thread Bryan Jeffrey
All,

Any thoughts?  I can run another couple of experiments to try to narrow the
problem.  The total data volume in the repartition is around 60GB / batch.

Regards,

Bryan Jeffrey

On Tue, Dec 13, 2016 at 12:11 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Hello.
>
> I have a current Spark 1.6.1 application that I am working to modify.  The
> flow of the application looks something like the following:
>
> (Kafka) --> (Direct Stream Receiver) --> (Repartition) -->
> (Extract/Schemitization Logic w/ RangePartitioner) --> Several Output
> Operations
>
> In the 'extract' segment above I have the following code:
>
> def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
> defaultPartitions: Int): DStream[SchematizedData] = {
> val messages: DStream[String] = kafkaStreamFactory.create(topics)
> val transformed = messages.map(Schematize(_))
> val result = enriched
> .map(event => { (event.targetEntity, event) })
> .transform(rdd => rdd.partitionBy(new RangePartitioner[String,
> SchematizedData](defaultPartitions, rdd)))
> .map(x => x._2)
>
>  result
> }
>
> This code executes well.  Each follow-on output operation (job) consumes
> from a transform 'partitionBy' ShuffledRDD.  The result is that my 2-minute
> batches are consumed in roughly 48 seconds (streaming batch interval), w/
> the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds
> (so around 78 seconds total).  I have 'concurrent jobs' set to 4.  The
> 'read from kafka' and 'extract' for a given set of topics is a single job
> (two stages), and all of the various output operations follow on after
> completion.
>
> I am aiming to remove the 'RangePartitioner' for two reasons:
>   1. We do not need to re-partition here. We're already calling
> stream.repartition in the Kafka Stream Factory.
>   2. It appears to be causing issues w/ failure to recompute when nodes go
> down.
>
> When I remove the 'RangePartitioner code, I have the following:
>
> def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
> defaultPartitions: Int): DStream[SchematizedData] = {
> val messages: DStream[String] = kafkaStreamFactory.create(topics)
> val transformed = messages.map(Schematize(_))
> val result = enriched.repartition(defaultPartitions)
>
>  result
> }
>
> The resulting code, with no other changes takes around 9 minutes to
> complete a single batch. I am having trouble determining why these have
> such a significant performance difference.  Looking at the partitionBy
> code, it is creating a separate ShuffledRDD w/in the transform block.  The
> 'stream.repartition' call is equivalent to 
> stream.transform(_.repartition(partitions)),
> which calls coalesce (shuffle=true), which creates a new ShuffledRDD with a
> HashPartitioner.  These calls appear functionally equivelent - I am having
> trouble coming up with a justification for the significant performance
> differences between calls.
>
> Help?
>
> Regards,
>
> Bryan Jeffrey
>
>


Streaming Batch Oddities

2016-12-13 Thread Bryan Jeffrey
Hello.

I have a current Spark 1.6.1 application that I am working to modify.  The
flow of the application looks something like the following:

(Kafka) --> (Direct Stream Receiver) --> (Repartition) -->
(Extract/Schemitization Logic w/ RangePartitioner) --> Several Output
Operations

In the 'extract' segment above I have the following code:

def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
defaultPartitions: Int): DStream[SchematizedData] = {
val messages: DStream[String] = kafkaStreamFactory.create(topics)
val transformed = messages.map(Schematize(_))
val result = enriched
.map(event => { (event.targetEntity, event) })
.transform(rdd => rdd.partitionBy(new RangePartitioner[String,
SchematizedData](defaultPartitions, rdd)))
.map(x => x._2)

 result
}

This code executes well.  Each follow-on output operation (job) consumes
from a transform 'partitionBy' ShuffledRDD.  The result is that my 2-minute
batches are consumed in roughly 48 seconds (streaming batch interval), w/
the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds
(so around 78 seconds total).  I have 'concurrent jobs' set to 4.  The
'read from kafka' and 'extract' for a given set of topics is a single job
(two stages), and all of the various output operations follow on after
completion.

I am aiming to remove the 'RangePartitioner' for two reasons:
  1. We do not need to re-partition here. We're already calling
stream.repartition in the Kafka Stream Factory.
  2. It appears to be causing issues w/ failure to recompute when nodes go
down.

When I remove the 'RangePartitioner code, I have the following:

def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
defaultPartitions: Int): DStream[SchematizedData] = {
val messages: DStream[String] = kafkaStreamFactory.create(topics)
val transformed = messages.map(Schematize(_))
val result = enriched.repartition(defaultPartitions)

 result
}

The resulting code, with no other changes takes around 9 minutes to
complete a single batch. I am having trouble determining why these have
such a significant performance difference.  Looking at the partitionBy
code, it is creating a separate ShuffledRDD w/in the transform block.  The
'stream.repartition' call is equivalent to
stream.transform(_.repartition(partitions)), which calls coalesce
(shuffle=true), which creates a new ShuffledRDD with a HashPartitioner.
These calls appear functionally equivelent - I am having trouble coming up
with a justification for the significant performance differences between
calls.

Help?

Regards,

Bryan Jeffrey


Re: New to spark.

2016-09-28 Thread Bryan Cutler
Hi Anirudh,

All types of contributions are welcome, from code to documentation.  Please
check out the page at
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark for
some info, specifically keep a watch out for starter JIRAs here
https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20labels%20%3D%20Starter%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)
.

On Wed, Sep 28, 2016 at 9:11 AM, Anirudh Muhnot  wrote:

> Hello everyone, I'm Anirudh. I'm fairly new to spark as I've done an
> online specialisation from UC Berkeley. I know how to code in Python but
> have little to no idea about Scala. I want to contribute to Spark, Where do
> I start and how? I'm reading the pull requests at Git Hub but I'm barley
> able to understand them. Can anyone help? Thank you.
> Sent from my iPhone
>


Re: Master OOM in "master-rebuild-ui-thread" while running stream app

2016-09-13 Thread Bryan Cutler
It looks like you have logging enabled and your application event log is
too large for the master to build a web UI from it.  In spark 1.6.2 and
earlier, when an application completes, the master rebuilds a web UI to
view events after the fact.  This functionality was removed in spark 2.0
and the history server should be used instead.  If you are unable to
upgrade could you try disabling logging?

On Sep 13, 2016 7:18 AM, "Mariano Semelman" 
wrote:

> Hello everybody,
>
> I am running a spark streaming app and I am planning to use it as a long
> running service. However while trying the app in a rc environment I got
> this exception in the master daemon after 1 hour of running:
>
> ​​Exception in thread "master-rebuild-ui-thread"
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at java.util.regex.Pattern.compile(Pattern.java:1667)
> at java.util.regex.Pattern.(Pattern.java:1351)
> at java.util.regex.Pattern.compile(Pattern.java:1054)
> at java.lang.String.replace(String.java:2239)
> at org.apache.spark.util.Utils$.getFormattedClassName(Utils.
> scala:1632)
> at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(
> JsonProtocol.scala:486)
> at org.apache.spark.scheduler.ReplayListenerBus.replay(
> ReplayListenerBus.scala:58)
> at org.apache.spark.deploy.master.Master$$anonfun$17.
> apply(Master.scala:972)
> at org.apache.spark.deploy.master.Master$$anonfun$17.
> apply(Master.scala:952)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> As a palliative measure I've increased the master memory to 1.5gb.
> My job is running with a batch interval of 5 seconds.
> I'm using spark version 1.6.2.
>
> I think it might be related to this issues:
>
> https://issues.apache.org/jira/browse/SPARK-6270
> https://issues.apache.org/jira/browse/SPARK-12062
> https://issues.apache.org/jira/browse/SPARK-12299
>
> But I don't see a clear road to solve this apart from upgrading spark.
> What would you recommend?
>
>
> Thanks in advance
> Mariano
>
>


Re: Random Forest Classification

2016-08-31 Thread Bryan Cutler
I see.  You might try this, create a pipeline of just your feature
transformers, then call fit() on the complete dataset to get a model.
Finally make second pipeline and add this model and the decision tree as
stages.

On Aug 30, 2016 8:19 PM, "Bahubali Jain" <bahub...@gmail.com> wrote:

> Hi Bryan,
> Thanks for the reply.
> I am indexing 5 columns ,then using these indexed columns to generate the
> "feature" column thru vector assembler.
> Which essentially means that I cannot use *fit()* directly on
> "completeDataset" dataframe since it will neither have the "feature" column
> and nor the 5 indexed columns.
> Of-course there is a dirty way of doing this, but I am wondering if there
> some optimized/intelligent approach for this.
>
> Thanks,
> Baahu
>
> On Wed, Aug 31, 2016 at 3:30 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> You need to first fit just the VectorIndexer which returns the model,
>> then add the model to the pipeline where it will only transform.
>>
>> val featureVectorIndexer = new VectorIndexer()
>> .setInputCol("feature")
>> .setOutputCol("indexedfeature")
>> .setMaxCategories(180)
>> .fit(completeDataset)
>>
>> On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <bahub...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I had run into similar exception " java.util.NoSuchElementException:
>>> key not found: " .
>>> After further investigation I realized it is happening due to
>>> vectorindexer being executed on training dataset and not on entire dataset.
>>>
>>> In the dataframe I have 5 categories , each of these have to go thru
>>> stringindexer and then these are put thru a vector indexer to generate
>>> feature vector.
>>> What is the right way to do this, so that vector indexer can be run on
>>> the entire data and not just on training data?
>>>
>>> Below is the current approach, as evident  VectorIndexer is being
>>> generated based on the training set.
>>>
>>> Please Note: fit() on Vectorindexer cannot be called on entireset
>>> dataframe since it doesn't have the required column(*feature *column is
>>> being generated dynamically in pipeline execution)
>>> How can the vectorindexer be *fit()* on the entireset?
>>>
>>>  val col1_indexer = new StringIndexer().setInputCol("c
>>> ol1").setOutputCol("indexed_col1")
>>> val col2_indexer = new StringIndexer().setInputCol("c
>>> ol2").setOutputCol("indexed_col2")
>>> val col3_indexer = new StringIndexer().setInputCol("c
>>> ol3").setOutputCol("indexed_col3")
>>> val col4_indexer = new StringIndexer().setInputCol("c
>>> ol4").setOutputCol("indexed_col4")
>>> val col5_indexer = new StringIndexer().setInputCol("c
>>> ol5").setOutputCol("indexed_col5")
>>>
>>> val featureArray =  Array("indexed_col1","indexed_
>>> col2","indexed_col3","indexed_col4","indexed_col5")
>>> val vectorAssembler = new VectorAssembler().setInputCols
>>> (featureArray).setOutputCol("*feature*")
>>> val featureVectorIndexer = new VectorIndexer()
>>> .setInputCol("feature")
>>> .setOutputCol("indexedfeature")
>>> .setMaxCategories(180)
>>>
>>> val decisionTree = new DecisionTreeClassifier().setMa
>>> xBins(300).setMaxDepth(1).setImpurity("entropy").setLabelCol
>>> ("indexed_user_action").setFeaturesCol("indexedfeature").
>>> setPredictionCol("prediction")
>>>
>>> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
>>> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,featureVecto
>>> rIndexer,decisionTree))
>>> val model = pipeline.*fit(trainingSet)*
>>> val output = model.transform(cvSet)
>>>
>>>
>>> Thanks,
>>> Baahu
>>>
>>> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>>>
>>>> Hi Rich,
>>>>
>>>> I looked at the notebook and it seems like you are fitting the
>>>> StringIndexer and VectorIndexer to only the training data, and it should
>>>> the the entire data set.  So if the training data does not include all of
>>>> the labels and an unknown label appears in the test data during evaluation,
&g

Re: Random Forest Classification

2016-08-30 Thread Bryan Cutler
You need to first fit just the VectorIndexer which returns the model, then
add the model to the pipeline where it will only transform.

val featureVectorIndexer = new VectorIndexer()
.setInputCol("feature")
.setOutputCol("indexedfeature")
.setMaxCategories(180)
.fit(completeDataset)

On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <bahub...@gmail.com> wrote:

> Hi,
> I had run into similar exception " java.util.NoSuchElementException: key
> not found: " .
> After further investigation I realized it is happening due to
> vectorindexer being executed on training dataset and not on entire dataset.
>
> In the dataframe I have 5 categories , each of these have to go thru
> stringindexer and then these are put thru a vector indexer to generate
> feature vector.
> What is the right way to do this, so that vector indexer can be run on the
> entire data and not just on training data?
>
> Below is the current approach, as evident  VectorIndexer is being
> generated based on the training set.
>
> Please Note: fit() on Vectorindexer cannot be called on entireset
> dataframe since it doesn't have the required column(*feature *column is
> being generated dynamically in pipeline execution)
> How can the vectorindexer be *fit()* on the entireset?
>
>  val col1_indexer = new StringIndexer().setInputCol("
> col1").setOutputCol("indexed_col1")
> val col2_indexer = new StringIndexer().setInputCol("
> col2").setOutputCol("indexed_col2")
> val col3_indexer = new StringIndexer().setInputCol("
> col3").setOutputCol("indexed_col3")
> val col4_indexer = new StringIndexer().setInputCol("
> col4").setOutputCol("indexed_col4")
> val col5_indexer = new StringIndexer().setInputCol("
> col5").setOutputCol("indexed_col5")
>
> val featureArray =  Array("indexed_col1","indexed_
> col2","indexed_col3","indexed_col4","indexed_col5")
> val vectorAssembler = new VectorAssembler().setInputCols(featureArray).
> setOutputCol("*feature*")
> val featureVectorIndexer = new VectorIndexer()
> .setInputCol("feature")
> .setOutputCol("indexedfeature")
> .setMaxCategories(180)
>
> val decisionTree = new DecisionTreeClassifier().
> setMaxBins(300).setMaxDepth(1).setImpurity("entropy").
> setLabelCol("indexed_user_action").setFeaturesCol("indexedfeature").
> setPredictionCol("prediction")
>
> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,
> featureVectorIndexer,decisionTree))
> val model = pipeline.*fit(trainingSet)*
> val output = model.transform(cvSet)
>
>
> Thanks,
> Baahu
>
> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Hi Rich,
>>
>> I looked at the notebook and it seems like you are fitting the
>> StringIndexer and VectorIndexer to only the training data, and it should
>> the the entire data set.  So if the training data does not include all of
>> the labels and an unknown label appears in the test data during evaluation,
>> then it will not know how to index it.  So your code should be like this,
>> fit with 'digits' instead of 'training'
>>
>> val labelIndexer = new StringIndexer().setInputCol("l
>> abel").setOutputCol("indexedLabel").fit(digits)
>> // Automatically identify categorical features, and index them.
>> // Set maxCategories so features with > 4 distinct values are treated as
>> continuous.
>> val featureIndexer = new VectorIndexer().setInputCol("f
>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)
>>
>> Hope that helps!
>>
>> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <richta...@gmail.com> wrote:
>>
>>> Hi Bryan.
>>>
>>> Thanks for your continued help.
>>>
>>> Here is the code shown in a Jupyter notebook. I figured this was easier
>>> that cutting and pasting the code into an email. If you  would like me to
>>> send you the code in a different format let, me know. The necessary data is
>>> all downloaded within the notebook itself.
>>>
>>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-
>>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2
>>> d94a794506bb282729dab8f05118fafe5f11886326e02fc
>>>
>>> A few additional pieces of information.
>>>
>>> 1. The training dataset is cached before training the model. 

Re: Grid Search using Spark MLLib Pipelines

2016-08-12 Thread Bryan Cutler
You will need to cast bestModel to include the MLWritable trait.  The class
Model does not mix it in by default.  For instance:

cvModel.bestModel.asInstanceOf[MLWritable].save("/my/path")

Alternatively, you could save the CV model directly, which takes care of
this

cvModel.save("/my/path")

On Fri, Aug 12, 2016 at 9:17 AM, Adamantios Corais <
adamantios.cor...@gmail.com> wrote:

> Hi,
>
> Assuming that I have run the following pipeline and have got the best
> logistic regression model. How can I then save that model for later use?
> The following command throws an error:
>
> cvModel.bestModel.save("/my/path")
>
> Also, is it possible to get the error (a collection of) for each
> combination of parameters?
>
> I am using spark 1.6.2
>
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.classification.LogisticRegression
> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
> import org.apache.spark.ml.tuning.{ParamGridBuilder , CrossValidator}
>
> val lr = new LogisticRegression()
>
> val pipeline = new Pipeline().
> setStages(Array(lr))
>
> val paramGrid = new ParamGridBuilder().
> addGrid(lr.elasticNetParam , Array(0.1)).
> addGrid(lr.maxIter , Array(10)).
> addGrid(lr.regParam , Array(0.1)).
> build()
>
> val cv = new CrossValidator().
> setEstimator(pipeline).
> setEvaluator(new BinaryClassificationEvaluator).
> setEstimatorParamMaps(paramGrid).
> setNumFolds(2)
>
> val cvModel = cv.
> fit(training)
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Why training data in Kmeans Spark streaming clustering

2016-08-11 Thread Bryan Cutler
The algorithm update is just broken into 2 steps: trainOn - to learn/update
the cluster centers, and predictOn - predicts cluster assignment on data

The StreamingKMeansExample you reference breaks up data into training and
test because you might want to score the predictions.  If you don't care
about that, you could just use a single stream for both steps.

On Thu, Aug 11, 2016 at 9:14 AM, Ahmed Sadek  wrote:

> Dear All,
>
> I was wondering why there is training data and testing data in kmeans ?
> Shouldn't it be unsupervised learning with just access to stream data ?
>
> I found similar question but couldn't understand the answer.
> http://stackoverflow.com/questions/30972057/is-the-
> streaming-k-means-clustering-predefined-in-mllib-library-of-spark-supervi
>
> Thanks!
> Ahmed
>


Re: Spark 2.0 - JavaAFTSurvivalRegressionExample doesn't work

2016-07-28 Thread Bryan Cutler
That's the correct fix.  I have this done along with a few other Java
examples that still use the old MLlib Vectors in this PR thats waiting for
review https://github.com/apache/spark/pull/14308

On Jul 28, 2016 5:14 AM, "Robert Goodman"  wrote:

> I changed import in the sample from
>
> import org.apache.spark.mllib.linalg.*;
>
> to
>
>import org.apache.spark.ml.linalg.*;
>
> and the sample now runs.
>
>Thanks
>  Bob
>
>
> On Wed, Jul 27, 2016 at 1:33 PM, Robert Goodman  wrote:
> > I tried to run the JavaAFTSurvivalRegressionExample on Spark 2.0 and the
> > example doesn't work. It looks like the problem is that the example is
> using
> > the MLLib Vector/VectorUDT to create the DataSet which needs to be
> converted
> > using MLUtils before using in the model. I haven't actually tried this
> yet.
> >
> > When I run the example (/bin/run-example
> > ml.JavaAFTSurvivalRegressionExample), I get the following stack trace
> >
> > Exception in thread "main" java.lang.IllegalArgumentException:
> requirement
> > failed: Column features must be of type
> > org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually
> > org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.
> > at scala.Predef$.require(Predef.scala:224)
> > at
> >
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
> > at
> >
> org.apache.spark.ml.regression.AFTSurvivalRegressionParams$class.validateAndTransformSchema(AFTSurvivalRegression.scala:106)
> > at
> >
> org.apache.spark.ml.regression.AFTSurvivalRegression.validateAndTransformSchema(AFTSurvivalRegression.scala:126)
> > at
> >
> org.apache.spark.ml.regression.AFTSurvivalRegression.fit(AFTSurvivalRegression.scala:199)
> > at
> >
> org.apache.spark.examples.ml.JavaAFTSurvivalRegressionExample.main(JavaAFTSurvivalRegressionExample.java:67)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
> > at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
> > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
> > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
> > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >
> >
> > Are you suppose to be able use the ML version of VectorUDT? The Spark 2.0
> > API docs for Java, don't show the class but I was able to import the
> class
> > into a java program.
> >
> >Thanks
> >  Bob
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Event Log Compression

2016-07-26 Thread Bryan Jeffrey
All,

I am running Spark 1.6.1. I enabled 'spark.eventLog.compress', and the data
is now being compressed using lz4.  I would like to move that back to
Snappy as I have some third party tools that require using Snappy.

Is there a variable used to control Spark eventLog compression algorithm?

Thank you,

Bryan Jeffrey


Spark 2.0

2016-07-25 Thread Bryan Jeffrey
All,

I had three questions:

(1) Is there a timeline for stable Spark 2.0 release?  I know the 'preview'
build is out there, but was curious what the timeline was for full
release. Jira seems to indicate that there should be a release 7/27.

(2)  For 'continuous' datasets there has been a lot of discussion. One item
that came up in tickets was the idea that 'count()' and other functions do
not apply to continuous datasets: https://github.com/apache/spark/pull/12080.
In this case what is the intended procedure to calculate a streaming
statistic based on an interval (e.g. count the number of records in a 2
minute window every 2 minutes)?

(3) In previous releases (1.6.1) the call to DStream / RDD repartition w/ a
number of partitions set to zero silently deletes data.  I have looked in
Jira for a similar issue, but I do not see one.  I would like to address
this (and would likely be willing to go fix it myself).  Should I just
create a ticket?

Thank you,

Bryan Jeffrey


Re: Programmatic use of UDFs from Java

2016-07-22 Thread Bryan Cutler
Everett, I had the same question today and came across this old thread.
Not sure if there has been any more recent work to support this.
http://apache-spark-developers-list.1001551.n3.nabble.com/Using-UDFs-in-Java-without-registration-td12497.html


On Thu, Jul 21, 2016 at 10:10 AM, Everett Anderson  wrote:

> Hi,
>
> In the Java Spark DataFrames API, you can create a UDF, register it, and
> then access it by string name by using the convenience UDF classes in
> org.apache.spark.sql.api.java
> 
> .
>
> Example
>
> UDF1 testUdf1 = new UDF1<>() { ... }
>
> sqlContext.udf().register("testfn", testUdf1, DataTypes.LongType);
>
> DataFrame df2 = df.withColumn("new_col", *functions.callUDF("testfn"*,
> df.col("old_col")));
>
> However, I'd like to avoid registering these by name, if possible, since I
> have many of them and would need to deal with name conflicts.
>
> There are udf() methods like this that seem to be from the Scala API
> ,
> where you don't have to register everything by name first.
>
> However, using those methods from Java would require interacting with
> Scala's scala.reflect.api.TypeTags.TypeTag. I'm having a hard time
> figuring out how to create a TypeTag from Java.
>
> Does anyone have an example of using the udf() methods from Java?
>
> Thanks!
>
> - Everett
>
>


Re: MLlib, Java, and DataFrame

2016-07-21 Thread Bryan Cutler
ML has a DataFrame based API, while MLlib is RDDs and will be deprecated as
of Spark 2.0.

On Thu, Jul 21, 2016 at 10:41 PM, VG <vlin...@gmail.com> wrote:

> Why do we have these 2 packages ... ml and mlib?
> What is the difference in these
>
>
>
> On Fri, Jul 22, 2016 at 11:09 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Hi JG,
>>
>> If you didn't know this, Spark MLlib has 2 APIs, one of which uses
>> DataFrames.  Take a look at this example
>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java
>>
>> This example uses a Dataset, which is type equivalent to a DataFrame.
>>
>>
>> On Thu, Jul 21, 2016 at 8:41 PM, Jean Georges Perrin <j...@jgp.net> wrote:
>>
>>> Hi,
>>>
>>> I am looking for some really super basic examples of MLlib (like a
>>> linear regression over a list of values) in Java. I have found a few, but I
>>> only saw them using JavaRDD... and not DataFrame.
>>>
>>> I was kind of hoping to take my current DataFrame and send them in
>>> MLlib. Am I too optimistic? Do you know/have any example like that?
>>>
>>> Thanks!
>>>
>>> jg
>>>
>>>
>>> Jean Georges Perrin
>>> j...@jgp.net / @jgperrin
>>>
>>>
>>>
>>>
>>>
>>
>


Re: MLlib, Java, and DataFrame

2016-07-21 Thread Bryan Cutler
Hi JG,

If you didn't know this, Spark MLlib has 2 APIs, one of which uses
DataFrames.  Take a look at this example
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java

This example uses a Dataset, which is type equivalent to a DataFrame.


On Thu, Jul 21, 2016 at 8:41 PM, Jean Georges Perrin  wrote:

> Hi,
>
> I am looking for some really super basic examples of MLlib (like a linear
> regression over a list of values) in Java. I have found a few, but I only
> saw them using JavaRDD... and not DataFrame.
>
> I was kind of hoping to take my current DataFrame and send them in MLlib.
> Am I too optimistic? Do you know/have any example like that?
>
> Thanks!
>
> jg
>
>
> Jean Georges Perrin
> j...@jgp.net / @jgperrin
>
>
>
>
>


Re: spark-submit local and Akka startup timeouts

2016-07-19 Thread Bryan Cutler
The patch I was referring to doesn't help on the ActorSystem startup
unfortunately.  As best I can tell the property
"akka.remote.startup-timeout" is what controls this timeout.  You can try
setting this to something greater in your Spark conf and hopefully that
would work.  Otherwise you might have luck trying a more recent version of
Spark, such as 1.6.2 or even 2.0.0 (soon to be released) which no longer
uses Akka and the ActorSystem.  Hope that helps!

On Tue, Jul 19, 2016 at 2:29 AM, Rory Waite <rwa...@sdl.com> wrote:

> Sorry Bryan, I should have mentioned that I'm running 1.6.0 for hadoop2.6.
> The binaries were downloaded from the Spark website.
>
>
> We're free to upgrade to Spark, create custom builds, etc. Please let me
> know how to display the config property.
>
>   <http://www.sdl.com/>
> www.sdl.com
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
> --
> *From:* Bryan Cutler <cutl...@gmail.com>
> *Sent:* 19 July 2016 02:20:38
> *To:* Rory Waite
> *Cc:* user
> *Subject:* Re: spark-submit local and Akka startup timeouts
>
>
> Hi Rory, for starters what version of Spark are you using?  I believe that
> in a 1.5.? release (I don't know which one off the top of my head) there
> was an addition that would also display the config property when a timeout
> happened.  That might help some if you are able to upgrade.
>
> On Jul 18, 2016 9:34 AM, "Rory Waite" <rwa...@sdl.com> wrote:
>
>> Hi All,
>>
>> We have created a regression test for a spark job that is executed during
>> our automated build. It executes a spark-submit with a local master,
>> processes some data, and the exits. We have an issue in that we get a
>> non-deterministic timeout error. It seems to be when the spark context
>> tries to initialise Akka (stack trace below). It doesn't happen often, but
>> when it does it causes the whole build to fail.
>>
>> The machines that run these tests get very heavily loaded, with many
>> regression tests running simultaneously. My theory is that the spark-submit
>> is sometimes unable to initialise Akka in time because the machines are so
>> heavily loaded with the other tests. My first thought was to try to tune
>> some parameter to extend the timeout, but I couldn't find anything in the
>> documentation. The timeout is short at 10s, whereas the default akka
>> timeout is set at 100s.
>>
>> Is there a way to adjust this timeout?
>>
>> 16/07/17 00:04:22 ERROR SparkContext: Error initializing SparkContext.
>> java.util.concurrent.TimeoutException: Futures timed out after [1
>> milliseconds]
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> at scala.concurrent.Await$.result(package.scala:107)
>> at akka.remote.Remoting.start(Remoting.scala:179)
>> at
>> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>> at
>> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
>> at
>> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
>> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
>> at org.apache.spark.SparkEnv$.cre

Re: spark-submit local and Akka startup timeouts

2016-07-18 Thread Bryan Cutler
Hi Rory, for starters what version of Spark are you using?  I believe that
in a 1.5.? release (I don't know which one off the top of my head) there
was an addition that would also display the config property when a timeout
happened.  That might help some if you are able to upgrade.

On Jul 18, 2016 9:34 AM, "Rory Waite"  wrote:

> Hi All,
>
> We have created a regression test for a spark job that is executed during
> our automated build. It executes a spark-submit with a local master,
> processes some data, and the exits. We have an issue in that we get a
> non-deterministic timeout error. It seems to be when the spark context
> tries to initialise Akka (stack trace below). It doesn't happen often, but
> when it does it causes the whole build to fail.
>
> The machines that run these tests get very heavily loaded, with many
> regression tests running simultaneously. My theory is that the spark-submit
> is sometimes unable to initialise Akka in time because the machines are so
> heavily loaded with the other tests. My first thought was to try to tune
> some parameter to extend the timeout, but I couldn't find anything in the
> documentation. The timeout is short at 10s, whereas the default akka
> timeout is set at 100s.
>
> Is there a way to adjust this timeout?
>
> 16/07/17 00:04:22 ERROR SparkContext: Error initializing SparkContext.
> java.util.concurrent.TimeoutException: Futures timed out after [1
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at akka.remote.Remoting.start(Remoting.scala:179)
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
> at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
> at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
> at org.apache.spark.SparkContext.(SparkContext.scala:457)
> at com.sdl.nntrainer.NNTrainer$.main(NNTrainer.scala:418)
> at com.sdl.nntrainer.NNTrainer.main(NNTrainer.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 16/07/17 00:04:22 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
> down remote daemon.
> 16/07/17 00:04:22 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" java.util.concurrent.TimeoutException: Futures
> timed out after [1 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at akka.remote.Remoting.start(Remoting.scala:179)
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
> at 

Re: Random Forest Classification

2016-07-08 Thread Bryan Cutler
Hi Rich,

I looked at the notebook and it seems like you are fitting the
StringIndexer and VectorIndexer to only the training data, and it should
the the entire data set.  So if the training data does not include all of
the labels and an unknown label appears in the test data during evaluation,
then it will not know how to index it.  So your code should be like this,
fit with 'digits' instead of 'training'

val labelIndexer = new
StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(digits)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as
continuous.
val featureIndexer = new
VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)

Hope that helps!

On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <richta...@gmail.com> wrote:

> Hi Bryan.
>
> Thanks for your continued help.
>
> Here is the code shown in a Jupyter notebook. I figured this was easier
> that cutting and pasting the code into an email. If you  would like me to
> send you the code in a different format let, me know. The necessary data is
> all downloaded within the notebook itself.
>
>
> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d94a794506bb282729dab8f05118fafe5f11886326e02fc
>
> A few additional pieces of information.
>
> 1. The training dataset is cached before training the model. If you do not
> cache the training dataset, the model will not train. The code
> model.transform(test) fails with a similar error. No other changes besides
> caching or not caching. Again, with the training dataset cached, the model
> can be successfully trained as seen in the notebook.
>
> 2. I have another version of the notebook where I download the same data
> in libsvm format rather than csv. That notebook works fine. All the code is
> essentially the same accounting for the difference in file formats.
>
> 3. I tested this same code on another Spark cloud platform and it displays
> the same symptoms when run there.
>
> Thanks.
> Rich
>
>
> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Are you fitting the VectorIndexer to the entire data set and not just
>> training or test data?  If you are able to post your code and some data to
>> reproduce, that would help in troubleshooting.
>>
>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <richta...@gmail.com> wrote:
>>
>>> Thanks for the response, but in my case I reversed the meaning of
>>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>>> way, but in retrospect, it probably only causes confusion to anyone else
>>> looking at this. I reran the code with all the pipeline stage inputs and
>>> outputs named exactly as in the Random Forest Classifier example to make
>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>
>>> I'm still at the point where I can train the model and make predictions,
>>> but not able to get the MulticlassClassificationEvaluator to work on
>>> the DataFrame of predictions.
>>>
>>> Any other suggestions? Thanks.
>>>
>>>
>>>
>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <richta...@gmail.com> wrote:
>>>
>>>> I created a ML pipeline using the Random Forest Classifier - similar to
>>>> what is described here except in my case the source data is in csv format
>>>> rather than libsvm.
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>>>>
>>>> I am able to successfully train the model and make predictions (on test
>>>> data not used to train the model) as shown here.
>>>>
>>>> ++--+-+--++
>>>> |indexedLabel|predictedLabel|label|prediction|features|
>>>> ++--+-+--++
>>>> | 4.0|   4.0|0| 0|(784,[124,125,126...|
>>>> | 2.0|   2.0|3| 3|(784,[119,120,121...|
>>>> | 8.0|   8.0|8| 8|(784,[180,181,182...|
>>>> | 0.0|   0.0|1| 1|(784,[154,155,156...|
>>>> | 3.0|   8.0|2| 8|(784,[148,149,150...|
>>>> ++--+-+--++
>>>> only showin

Re: ClassNotFoundException: org.apache.parquet.hadoop.ParquetOutputCommitter

2016-07-07 Thread Bryan Cutler
Can you try running the example like this

./bin/run-example sql.RDDRelation 

I know there are some jars in the example folders, and running them this
way adds them to the classpath
On Jul 7, 2016 3:47 AM, "kevin"  wrote:

> hi,all:
> I build spark use:
>
> ./make-distribution.sh --name "hadoop2.7.1" --tgz
> "-Pyarn,hadoop-2.6,parquet-provided,hive,hive-thriftserver" -DskipTests
> -Dhadoop.version=2.7.1
>
> I can run example :
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
> --master spark://master1:7077 \
> --driver-memory 1g \
> --executor-memory 512m \
> --executor-cores 1 \
> lib/spark-examples*.jar \
> 10
>
> but can't run example :
> org.apache.spark.examples.sql.RDDRelation
>
> *I got error:*
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/2 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/4 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/3 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/0 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/1 is now RUNNING
> 16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
> app-20160707182845-0003/5 is now RUNNING
> 16/07/07 18:28:46 INFO cluster.SparkDeploySchedulerBackend:
> SchedulerBackend is ready for scheduling beginning after reached
> minRegisteredResourcesRatio: 0.0
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/parquet/hadoop/ParquetOutputCommitter
> at org.apache.spark.sql.SQLConf$.(SQLConf.scala:319)
> at org.apache.spark.sql.SQLConf$.(SQLConf.scala)
> at org.apache.spark.sql.SQLContext.(SQLContext.scala:85)
> at org.apache.spark.sql.SQLContext.(SQLContext.scala:77)
> at main.RDDRelation$.main(RDDRelation.scala:13)
> at main.RDDRelation.main(RDDRelation.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.parquet.hadoop.ParquetOutputCommitter
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 15 more
>
>


Re: Set the node the spark driver will be started

2016-06-29 Thread Bryan Cutler
Hi Felix,

I think the problem you are describing has been fixed in later versions,
check out this JIRA https://issues.apache.org/jira/browse/SPARK-13803


On Wed, Jun 29, 2016 at 9:27 AM, Mich Talebzadeh 
wrote:

> Fine. in standalone mode spark uses its own scheduling as opposed to Yarn
> or anything else.
>
> As a matter of interest can you start spark-submit from any node in the
> cluster? Are these all have the same or similar CPU and RAM?
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 June 2016 at 10:54, Felix Massem 
> wrote:
>
>> In addition we are not using Yarn we are using the standalone mode and
>> the driver will be started with the deploy-mode cluster
>>
>> Thx Felix
>> Felix Massem | IT-Consultant | Karlsruhe
>> mobil: +49 (0) 172.2919848
>>
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet.
>>
>> Am 29.06.2016 um 11:13 schrieb Felix Massem > >:
>>
>> Hey Mich,
>>
>> the distribution is like not given. Just right now I have 15 applications
>> and all 15 drivers are running on one node. This is just after giving all
>> machines a little more memory.
>> Before I had like 15 applications and about 13 driver where running on
>> one machine. While trying to submit a new job I got OOM exceptions which
>> took down my cassandra service only to start the driver on the same node
>> where  all the other 13 drivers where running.
>>
>> Thx and best regards
>> Felix
>>
>>
>> Felix Massem | IT-Consultant | Karlsruhe
>> mobil: +49 (0) 172.2919848
>>
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet.
>>
>> Am 28.06.2016 um 17:55 schrieb Mich Talebzadeh > >:
>>
>> Hi Felix,
>>
>> In Yarn-cluster mode the resource manager Yarn is expected to take care
>> of that.
>>
>> Are you getting some skewed distribution with drivers created through
>> spark-submit on different nodes?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 28 June 2016 at 16:06, Felix Massem 
>> wrote:
>>
>>> Hey Mich,
>>>
>>> thx for the fast reply.
>>>
>>> We are using it in cluster mode and spark version 1.5.2
>>>
>>> Greets Felix
>>>
>>>
>>> Felix Massem | IT-Consultant | Karlsruhe
>>> mobil: +49 (0) 172.2919848
>>>
>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>> www.more4fi.de
>>>

Re: Random Forest Classification

2016-06-28 Thread Bryan Cutler
Are you fitting the VectorIndexer to the entire data set and not just
training or test data?  If you are able to post your code and some data to
reproduce, that would help in troubleshooting.

On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro  wrote:

> Thanks for the response, but in my case I reversed the meaning of
> "prediction" and "predictedLabel". It seemed to make more sense to me that
> way, but in retrospect, it probably only causes confusion to anyone else
> looking at this. I reran the code with all the pipeline stage inputs and
> outputs named exactly as in the Random Forest Classifier example to make
> sure I hadn't messed anything up when I renamed things. Same error.
>
> I'm still at the point where I can train the model and make predictions,
> but not able to get the MulticlassClassificationEvaluator to work on the
> DataFrame of predictions.
>
> Any other suggestions? Thanks.
>
>
>
> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro  wrote:
>
>> I created a ML pipeline using the Random Forest Classifier - similar to
>> what is described here except in my case the source data is in csv format
>> rather than libsvm.
>>
>>
>> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>>
>> I am able to successfully train the model and make predictions (on test
>> data not used to train the model) as shown here.
>>
>> ++--+-+--++
>> |indexedLabel|predictedLabel|label|prediction|features|
>> ++--+-+--++
>> | 4.0|   4.0|0| 0|(784,[124,125,126...|
>> | 2.0|   2.0|3| 3|(784,[119,120,121...|
>> | 8.0|   8.0|8| 8|(784,[180,181,182...|
>> | 0.0|   0.0|1| 1|(784,[154,155,156...|
>> | 3.0|   8.0|2| 8|(784,[148,149,150...|
>> ++--+-+--++
>> only showing top 5 rows
>>
>> However, when I attempt to calculate the error between the indexedLabel and 
>> the precictedLabel using the MulticlassClassificationEvaluator, I get the 
>> NoSuchElementException error attached below.
>>
>> val evaluator = new 
>> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>> val accuracy = evaluator.evaluate(predictions)
>> println("Test Error = " + (1.0 - accuracy))
>>
>> What could be the issue?
>>
>>
>>
>> Name: org.apache.spark.SparkException
>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 
>> times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, 
>> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 
>> 132.0
>>  at scala.collection.MapLike$class.default(MapLike.scala:228)
>>  at scala.collection.AbstractMap.default(Map.scala:58)
>>  at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>  at scala.collection.AbstractMap.apply(Map.scala:58)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>  at 
>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>  at 
>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>  at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at 
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>  at 
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>  at 

Re: Random Forest Classification

2016-06-28 Thread Bryan Cutler
The problem might be that you are evaluating with "predictionLabel" instead
of "prediction", where predictionLabel is the prediction index mapped to
the original label strings - at least according to the
RandomForestClassifierExample, not sure if your code is exactly the same.

On Tue, Jun 28, 2016 at 1:21 PM, Rich Tarro  wrote:

> I created a ML pipeline using the Random Forest Classifier - similar to
> what is described here except in my case the source data is in csv format
> rather than libsvm.
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>
> I am able to successfully train the model and make predictions (on test
> data not used to train the model) as shown here.
>
> ++--+-+--++
> |indexedLabel|predictedLabel|label|prediction|features|
> ++--+-+--++
> | 4.0|   4.0|0| 0|(784,[124,125,126...|
> | 2.0|   2.0|3| 3|(784,[119,120,121...|
> | 8.0|   8.0|8| 8|(784,[180,181,182...|
> | 0.0|   0.0|1| 1|(784,[154,155,156...|
> | 3.0|   8.0|2| 8|(784,[148,149,150...|
> ++--+-+--++
> only showing top 5 rows
>
> However, when I attempt to calculate the error between the indexedLabel and 
> the precictedLabel using the MulticlassClassificationEvaluator, I get the 
> NoSuchElementException error attached below.
>
> val evaluator = new 
> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
> val accuracy = evaluator.evaluate(predictions)
> println("Test Error = " + (1.0 - accuracy))
>
> What could be the issue?
>
>
>
> Name: org.apache.spark.SparkException
> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 
> times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, 
> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 
> 132.0
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.MapLike$class.apply(MapLike.scala:141)
>   at scala.collection.AbstractMap.apply(Map.scala:58)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>   at 
> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>   at 
> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>   at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.lang.Thread.run(Thread.java:785)
>
> Driver stacktrace:
> StackTrace: 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> 

Re: LogisticRegression.scala ERROR, require(Predef.scala)

2016-06-23 Thread Bryan Cutler
The stack trace you provided seems to hint that you are calling "predict"
on an RDD with Vectors that are not the same size as the number of features
in your trained model, they should be equal.  If that's not the issue, it
would be easier to troubleshoot if you could share your code and possibly
some test data.

On Thu, Jun 23, 2016 at 4:30 AM, Ascot Moss  wrote:

> Hi,
>
> My Spark is 1.5.2, when trying MLLib, I got the following error. Any idea
> to fix it?
>
> Regards
>
>
> ==
>
> 16/06/23 16:26:20 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID
> 5)
>
> java.lang.IllegalArgumentException: requirement failed
>
> at scala.Predef$.require(Predef.scala:221)
>
> at
> org.apache.spark.mllib.classification.LogisticRegressionModel.predictPoint(LogisticRegression.scala:118)
>
> at
> org.apache.spark.mllib.regression.GeneralizedLinearModel$$anonfun$predict$1$$anonfun$apply$1.apply(GeneralizedLinearAlgorithm.scala:65)
>
> at
> org.apache.spark.mllib.regression.GeneralizedLinearModel$$anonfun$predict$1$$anonfun$apply$1.apply(GeneralizedLinearAlgorithm.scala:65)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.next(RDD.scala:815)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.next(RDD.scala:808)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
>
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 16/06/23 16:26:20 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
> localhost): java.lang.IllegalArgumentException: requirement failed
>
> at scala.Predef$.require(Predef.scala:221)
>
> at
> org.apache.spark.mllib.classification.LogisticRegressionModel.predictPoint(LogisticRegression.scala:118)
>
> at
> org.apache.spark.mllib.regression.GeneralizedLinearModel$$anonfun$predict$1$$anonfun$apply$1.apply(GeneralizedLinearAlgorithm.scala:65)
>
> at
> org.apache.spark.mllib.regression.GeneralizedLinearModel$$anonfun$predict$1$$anonfun$apply$1.apply(GeneralizedLinearAlgorithm.scala:65)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.next(RDD.scala:815)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.next(RDD.scala:808)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
>
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
> 16/06/23 16:26:20 ERROR TaskSetManager: Task 0 in stage 5.0 failed 1
> times; aborting job
>
> 16/06/23 

Re: Kafka Exceptions

2016-06-13 Thread Bryan Jeffrey
Cody,

We already set the maxRetries.  We're still seeing issue - when leader is
shifted, for example, it does not appear that direct stream reader
correctly handles this.  We're running 1.6.1.

Bryan Jeffrey

On Mon, Jun 13, 2016 at 10:37 AM, Cody Koeninger <c...@koeninger.org> wrote:

> http://spark.apache.org/docs/latest/configuration.html
>
> spark.streaming.kafka.maxRetries
>
> spark.task.maxFailures
>
> On Mon, Jun 13, 2016 at 8:25 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
> > All,
> >
> > We're running a Spark job that is consuming data from a large Kafka
> cluster
> > using the Direct Stream receiver.  We're seeing intermittent
> > NotLeaderForPartitionExceptions when the leader is moved to another
> broker.
> > Currently even with retry enabled we're seeing the job fail at this
> > exception.  Is there a configuration setting I am missing?  How are these
> > issues typically handled?
> >
> > User class threw exception: org.apache.spark.SparkException:
> > ArrayBuffer(kafka.common.NotLeaderForPartitionException,
> > org.apache.spark.SparkException: Couldn't find leader offsets for
> > Set([MyTopic,43]))
> >
> > Thank you,
> >
> > Bryan Jeffrey
> >
>


Kafka Exceptions

2016-06-13 Thread Bryan Jeffrey
All,

We're running a Spark job that is consuming data from a large Kafka cluster
using the Direct Stream receiver.  We're seeing intermittent
NotLeaderForPartitionExceptions when the leader is moved to another
broker.  Currently even with retry enabled we're seeing the job fail at
this exception.  Is there a configuration setting I am missing?  How are
these issues typically handled?

User class threw exception: org.apache.spark.SparkException:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([MyTopic,43]))

Thank you,

Bryan Jeffrey


Re: Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
All,

Thank you for the replies.  It seems as though the Dataset API is still far
behind the RDD API.  This is unfortunate as the Dataset API potentially
provides a number of performance benefits.  I will move to using it in a
more limited set of cases for the moment.

Thank you!

Bryan Jeffrey

On Tue, Jun 7, 2016 at 2:50 PM, Richard Marscher <rmarsc...@localytics.com>
wrote:

> There certainly are some gaps between the richness of the RDD API and the
> Dataset API. I'm also migrating from RDD to Dataset and ran into
> reduceByKey and join scenarios.
>
> In the spark-dev list, one person was discussing reduceByKey being
> sub-optimal at the moment and it spawned this JIRA
> https://issues.apache.org/jira/browse/SPARK-15598. But you might be able
> to get by with groupBy().reduce() for now, check performance though.
>
> As for join, the approach would be using the joinWith function on Dataset.
> Although the API isn't as sugary as it was for RDD IMO, something which
> I've been discussing in a separate thread as well. I can't find a weblink
> for it but the thread subject is "Dataset Outer Join vs RDD Outer Join".
>
> On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> It would also be nice if there was a better example of joining two
>> Datasets. I am looking at the documentation here:
>> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems
>> a little bit sparse - is there a better documentation source?
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
>> wrote:
>>
>>> Hello.
>>>
>>> I am looking at the option of moving RDD based operations to Dataset
>>> based operations.  We are calling 'reduceByKey' on some pair RDDs we have.
>>> What would the equivalent be in the Dataset interface - I do not see a
>>> simple reduceByKey replacement.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>>
>>
>
>
> --
> *Richard Marscher*
> Senior Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>


Re: Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
It would also be nice if there was a better example of joining two
Datasets. I am looking at the documentation here:
http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems a
little bit sparse - is there a better documentation source?

Regards,

Bryan Jeffrey

On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Hello.
>
> I am looking at the option of moving RDD based operations to Dataset based
> operations.  We are calling 'reduceByKey' on some pair RDDs we have.  What
> would the equivalent be in the Dataset interface - I do not see a simple
> reduceByKey replacement.
>
> Regards,
>
> Bryan Jeffrey
>
>


Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
Hello.

I am looking at the option of moving RDD based operations to Dataset based
operations.  We are calling 'reduceByKey' on some pair RDDs we have.  What
would the equivalent be in the Dataset interface - I do not see a simple
reduceByKey replacement.

Regards,

Bryan Jeffrey


Re: Specify node where driver should run

2016-06-06 Thread Bryan Cutler
I'm not an expert on YARN so anyone please correct me if I'm wrong, but I
believe the Resource Manager will schedule the application to be run on the
AM of any node that has a Node Manager, depending on available resources.
So you would normally query the RM via the REST API to determine that.  You
can restrict which nodes get scheduled using this propery
spark.yarn.am.nodeLabelExpression.
See here for details
http://spark.apache.org/docs/latest/running-on-yarn.html

On Mon, Jun 6, 2016 at 9:04 AM, Saiph Kappa <saiph.ka...@gmail.com> wrote:

> How can I specify the node where application master should run in the yarn
> conf? I haven't found any useful information regarding that.
>
> Thanks.
>
> On Mon, Jun 6, 2016 at 4:52 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> In that mode, it will run on the application master, whichever node that
>> is as specified in your yarn conf.
>> On Jun 5, 2016 4:54 PM, "Saiph Kappa" <saiph.ka...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In yarn-cluster mode, is there any way to specify on which node I want
>>> the driver to run?
>>>
>>> Thanks.
>>>
>>
>


Re: Specify node where driver should run

2016-06-06 Thread Bryan Cutler
In that mode, it will run on the application master, whichever node that is
as specified in your yarn conf.
On Jun 5, 2016 4:54 PM, "Saiph Kappa"  wrote:

> Hi,
>
> In yarn-cluster mode, is there any way to specify on which node I want the
> driver to run?
>
> Thanks.
>


Re: Multinomial regression with spark.ml version of LogisticRegression

2016-05-29 Thread Bryan Cutler
This is currently being worked on, planned for 2.1 I believe
https://issues.apache.org/jira/browse/SPARK-7159
On May 28, 2016 9:31 PM, "Stephen Boesch"  wrote:

> Thanks Phuong But the point of my post is how to achieve without using
>  the deprecated the mllib pacakge. The mllib package already has
>  multinomial regression built in
>
> 2016-05-28 21:19 GMT-07:00 Phuong LE-HONG :
>
>> Dear Stephen,
>>
>> Yes, you're right, LogisticGradient is in the mllib package, not ml
>> package. I just want to say that we can build a multinomial logistic
>> regression model from the current version of Spark.
>>
>> Regards,
>>
>> Phuong
>>
>>
>>
>> On Sun, May 29, 2016 at 12:04 AM, Stephen Boesch 
>> wrote:
>> > Hi Phuong,
>> >The LogisticGradient exists in the mllib but not ml package. The
>> > LogisticRegression chooses either the breeze LBFGS - if L2 only (not
>> elastic
>> > net) and no regularization or the Orthant Wise Quasi Newton (OWLQN)
>> > otherwise: it does not appear to choose GD in either scenario.
>> >
>> > If I have misunderstood your response please do clarify.
>> >
>> > thanks stephenb
>> >
>> > 2016-05-28 20:55 GMT-07:00 Phuong LE-HONG :
>> >>
>> >> Dear Stephen,
>> >>
>> >> The Logistic Regression currently supports only binary regression.
>> >> However, the LogisticGradient does support computing gradient and loss
>> >> for a multinomial logistic regression. That is, you can train a
>> >> multinomial logistic regression model with LogisticGradient and a
>> >> class to solve optimization like LBFGS to get a weight vector of the
>> >> size (numClassrd-1)*numFeatures.
>> >>
>> >>
>> >> Phuong
>> >>
>> >>
>> >> On Sat, May 28, 2016 at 12:25 PM, Stephen Boesch 
>> >> wrote:
>> >> > Followup: just encountered the "OneVsRest" classifier in
>> >> > ml.classsification: I will look into using it with the binary
>> >> > LogisticRegression as the provided classifier.
>> >> >
>> >> > 2016-05-28 9:06 GMT-07:00 Stephen Boesch :
>> >> >>
>> >> >>
>> >> >> Presently only the mllib version has the one-vs-all approach for
>> >> >> multinomial support.  The ml version with ElasticNet support only
>> >> >> allows
>> >> >> binary regression.
>> >> >>
>> >> >> With feature parity of ml vs mllib having been stated as an
>> objective
>> >> >> for
>> >> >> 2.0.0 -  is there a projected availability of the  multinomial
>> >> >> regression in
>> >> >> the ml package?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> `
>> >> >
>> >> >
>> >
>> >
>>
>
>


Streaming application slows over time

2016-05-09 Thread Bryan Jeffrey
All,

I am seeing an odd issue with my streaming application.  I am running Spark
1.4.1, Scala 2.10.  Our streaming application has a batch time of two
minutes.  The application runs well for a reasonable period of time (4-8
hours).  It processes the same data in approximately the same amount of
time.  Because we're consuming large amounts of data, I tweaked some GC
settings to collect more frequently and enabled G1GC.  This effectively
corrected any memory pressure, so memory is looking good.

After some number of batches I am seeing the time to process a batch
increase by a large margin (from 40 seconds / batch to 130 seconds /
batch).  Because our batch time is 2 minutes this has the effect of
(eventually) running behind. This in turn causes memory issues and
eventually leads to OOM.  However, OOM seems to be a symptom of the
slowdown not a cause.  I have looked in driver/executor logs, but do not
see any reason that we're seeing slowness.  The data volume is the same,
none of the executors seem to be crashing, nothing is going to disk, memory
is well within bounds, and nothing else is running on these boxes.

Are there any suggested debugging techniques, things to look for or known
bugs in similar instances?

Regards,

Bryan Jeffrey


Issues with Long Running Streaming Application

2016-04-25 Thread Bryan Jeffrey
Hello.

I have a long running streaming application. It is consuming a large amount
of data from Kafka (on the order of 25K messages / second in two minute
batches.  The job reads the data, makes some decisions on what to save, and
writes the selected data into Cassandra.  The job is very stable - each
batch takes around 25 seconds to process, and every 30 minutes new training
data is read from Cassandra which increases batch time to around 1 minute.

The issue I'm seeing is that the job is stable for around 5-7 hours, after
which it takes an increasingly long time to compute each batch.  The
executor memory used (cached RDDs) remains around the same level, no
operation takes more than a single batch into account, the write time to
Cassandra does not vary significantly - everything just suddenly seems to
take a longer period of time to compute.

Initially I was seeing issues with instability in a shorter time horizon.
To address these issues I took the following steps:

1. Explicitly expired RDDs via 'unpersist' once they were no longer
required.
2. Turned on gen-1 GC via -XX:+UseG1GC
3. Enabled Kryo serialization
4. Removed several long-running aggregate operations (reduceByKeyAndWindow,
updateStateByKey) from this job.

The result is a job that appears completely stable for hours at a time.
The OS does not appear to have any odd tasks run, Cassandra
compaction/cleanup/repair is not responsible for the delay.   Has anyone
seen similar behavior?  Any thoughts?

Regards,

Bryan Jeffrey


Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Bryan Jeffrey
Here is what we're doing:


import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import net.liftweb.json.Extraction._
import net.liftweb.json._
import org.apache.spark.streaming.dstream.DStream

class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
Int) {
  def write[T](data: DStream[T]): Unit = {
KafkaWriter.write(data, topic, brokers, numPartitions)
  }
}

object KafkaWriter {
  def write[T](data: DStream[T], topic: String, brokers: Array[String],
numPartitions: Int): Unit = {
val dataToWrite =
  if (numPartitions > 0) {
data.repartition(numPartitions)
  } else {
data
  }

dataToWrite
  .map(x => new KeyedMessage[String, String](topic,
KafkaWriter.toJson(x)))
  .foreachRDD(rdd => {
  rdd.foreachPartition(part => {
val producer: Producer[String, String] =
KafkaWriter.createProducer(brokers)
part.foreach(item => producer.send(item))
producer.close()
  })
})
  }

  def apply(brokers: Option[Array[String]], topic: String, numPartitions:
Int): KafkaWriter = {
val brokersToUse =
  brokers match {
case Some(x) => x
case None => throw new IllegalArgumentException("Must specify
brokers!")
  }

new KafkaWriter(brokersToUse, topic, numPartitions)
  }

  def toJson[T](data: T): String = {
implicit val formats = DefaultFormats ++
net.liftweb.json.ext.JodaTimeSerializers.all
compactRender(decompose(data))
  }

  def createProducer(brokers: Array[String]): Producer[String, String] = {
val properties = new Properties()
properties.put("metadata.broker.list", brokers.mkString(","))
properties.put("serializer.class", "kafka.serializer.StringEncoder")

val kafkaConfig = new ProducerConfig(properties)
new Producer[String, String](kafkaConfig)
  }
}


Then just call:

val kafkaWriter: KafkaWriter =
KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
config.getString(Parameters.topicName), numPartitions =
kafkaWritePartitions)
detectionWriter.write(dataToWriteToKafka)


Hope that helps!

Bryan Jeffrey

On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego <agall...@concord.io>
wrote:

> Thanks Ted.
>
>  KafkaWordCount (producer) does not operate on a DStream[T]
>
> ```scala
>
>
> object KafkaWordCountProducer {
>
>   def main(args: Array[String]) {
> if (args.length < 4) {
>   System.err.println("Usage: KafkaWordCountProducer
>   " +
> " ")
>   System.exit(1)
> }
>
> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>
> // Zookeeper connection properties
> val props = new HashMap[String, Object]()
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
>
> val producer = new KafkaProducer[String, String](props)
>
> // Send some messages
> while(true) {
>   (1 to messagesPerSec.toInt).foreach { messageNum =>
> val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>   .mkString(" ")
>
> val message = new ProducerRecord[String, String](topic, null, str)
> producer.send(message)
>   }
>
>   Thread.sleep(1000)
> }
>   }
>
> }
>
> ```
>
>
> Also, doing:
>
>
> ```
> object KafkaSink {
>  def send(brokers: String, sc: SparkContext, topic: String, key:
> String, value: String) =
> getInstance(brokers, sc).value.send(new ProducerRecord(topic,
> key, value))
> }
>
> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>
> ```
>
>
> Doesn't work either, the result is:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>
>
> Thanks!
>
>
>
>
> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > In KafkaWordCount , the String is sent back and producer.send() is
> called.
> >
> > I guess if you don't find via solution in your current design, you can
> consider the above.
> >
> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io>
> wrote:
> >>
> >> Hello,
> >>
> >> I understand that you cannot serialize Kafka Producer.
> >>
> >> So I've tried:
> >>
> >> (as suggested here
> https://forums.databricks.co

Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Bryan Jeffrey
Cody et. al,

I am seeing a similar error.  I've increased the number of retries.  Once
I've got a job up and running I'm seeing it retry correctly. However, I am
having trouble getting the job started - number of retries does not seem to
help with startup behavior.

Thoughts?

Regards,

Bryan Jeffrey

On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeninger <c...@koeninger.org> wrote:

> That's a networking error when the driver is attempting to contact
> leaders to get the latest available offsets.
>
> If it's a transient error, you can look at increasing the value of
> spark.streaming.kafka.maxRetries, see
>
> http://spark.apache.org/docs/latest/configuration.html
>
> If it's not a transient error, you need to look at your brokers + your
> network environment.
>
> On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti
> <surendra.manchika...@gmail.com> wrote:
> > Hi,
> >
> > Can you check Kafka topic replication ? And leader information?
> >
> > Regards,
> > Surendra M
> >
> >
> >
> > -- Surendra Manchikanti
> >
> > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss <ascot.m...@gmail.com>
> wrote:
> >>
> >> Hi,
> >>
> >> I have a SparkStream (with Kafka) job, after running several days, it
> >> failed with following errors:
> >> ERROR DirectKafkaInputDStream:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> Any idea what would be wrong? will it be SparkStreaming buffer overflow
> >> issue?
> >>
> >>
> >>
> >> Regards
> >>
> >>
> >>
> >>
> >>
> >>
> >> *** from the log ***
> >>
> >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect
> is
> >> overridden to
> >>
> >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
> >> java.nio.channels.ClosedChannelException
> >>
> >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
> >> 1458188031800 ms
> >>
> >> org.apache.spark.SparkException:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> at
> >>
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
> >>
> >> at
> >>
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> >>
> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> >>
> >> at scala.Option.orElse(Option.scala:257)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> >>
> >> at
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >>
> >> at
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >>
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
&g

  1   2   >