Re: Query around Spark Checkpoints

2020-09-29 Thread Bryan Jeffrey
Jungtaek,

How would you contrast stateful streaming with checkpoint vs. the idea of
writing updates to a Delta Lake table, and then using the Delta Lake table
as a streaming source for our state stream?

Thank you,

Bryan

On Mon, Sep 28, 2020 at 9:50 AM Debabrata Ghosh 
wrote:

> Thank You Jungtaek and Amit ! This is very helpful indeed !
>
> Cheers,
>
> Debu
>
> On Mon, Sep 28, 2020 at 5:33 AM Jungtaek Lim 
> wrote:
>
>>
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
>>
>> You would need to implement CheckpointFileManager by yourself, which is
>> tightly integrated with HDFS (parameters and return types of methods are
>> mostly from HDFS). That wouldn't mean it's impossible to
>> implement CheckpointFileManager against a non-filesystem, but it'd be
>> non-trivial to override all of the functionalities and make it work
>> seamlessly.
>>
>> Required consistency is documented via javadoc of CheckpointFileManager -
>> please go through reading it, and evaluate whether your target storage can
>> fulfill the requirement.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Mon, Sep 28, 2020 at 3:04 AM Amit Joshi 
>> wrote:
>>
>>> Hi,
>>>
>>> As far as I know, it depends on whether you are using spark streaming or
>>> structured streaming.
>>> In spark streaming you can write your own code to checkpoint.
>>> But in case of structured streaming it should be file location.
>>> But main question in why do you want to checkpoint in
>>> Nosql, as it's eventual consistence.
>>>
>>>
>>> Regards
>>> Amit
>>>
>>> On Sunday, September 27, 2020, Debabrata Ghosh 
>>> wrote:
>>>
 Hi,
 I had a query around Spark checkpoints - Can I store the
 checkpoints in NoSQL or Kafka instead of Filesystem ?

 Regards,

 Debu

>>>


Re: Metrics Problem

2020-07-10 Thread Bryan Jeffrey
On Thu, Jul 2, 2020 at 2:33 PM Bryan Jeffrey 
wrote:

> Srinivas,
>
> I finally broke a little bit of time free to look at this issue.  I
> reduced the scope of my ambitions and simply cloned a the ConsoleSink and
> ConsoleReporter class.  After doing so I can see the original version
> works, but the 'modified' version does not work.  The only difference is
> the name & location of the associated JAR.
>
> Looking here:
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Metric-Sink-on-Executor-Always-ClassNotFound-td34205.html#a34206
> "For executors, the jar file of the sink needs to be in the system classpath;
> the application jar is not in the system classpath, so that does not
> work. There are different ways for you to get it there, most of them
> manual (YARN is, I think, the only RM supported in Spark where the
> application itself can do it)."
>
> Looking here:
> https://forums.databricks.com/questions/706/how-can-i-attach-a-jar-library-to-the-cluster-that.html
> "Everything in spark.executor.extraClassPath is on the System classpath.
> These are listed in the Classpath Entries section and marked with Source =
> System Classpath. Everything else is on the application classpath."
>
> From the databricks forum above, it appears that one could pass in a jar
> via '--jars' and then call '--conf spark.executor.extraClassPath ./myJar'.
> However, this does not appear to work; the file is there, but not added in
> the classpath.
>
> Regards,
>
> Bryan Jeffrey
>
>
> On Tue, Jun 30, 2020 at 12:55 PM Srinivas V  wrote:
>
>> Then it should permission issue. What kind of cluster is it and which
>> user is running it ? Does that user have hdfs permissions to access the
>> folder where the jar file is ?
>>
>> On Mon, Jun 29, 2020 at 1:17 AM Bryan Jeffrey 
>> wrote:
>>
>>> Srinivas,
>>>
>>> Interestingly, I did have the metrics jar packaged as part of my main
>>> jar. It worked well both on driver and locally, but not on executors.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>> Get Outlook for Android <https://aka.ms/ghei36>
>>>
>>> ----------
>>> *From:* Srinivas V 
>>> *Sent:* Saturday, June 27, 2020 1:23:24 AM
>>>
>>> *To:* Bryan Jeffrey 
>>> *Cc:* user 
>>> *Subject:* Re: Metrics Problem
>>>
>>> One option is to create your main jar included with metrics jar like a
>>> fat jar.
>>>
>>> On Sat, Jun 27, 2020 at 8:04 AM Bryan Jeffrey 
>>> wrote:
>>>
>>> Srinivas,
>>>
>>> Thanks for the insight. I had not considered a dependency issue as the
>>> metrics jar works well applied on the driver. Perhaps my main jar
>>> includes the Hadoop dependencies but the metrics jar does not?
>>>
>>> I am confused as the only Hadoop dependency also exists for the built in
>>> metrics providers which appear to work.
>>>
>>> Regards,
>>>
>>> Bryan
>>>
>>> Get Outlook for Android <https://aka.ms/ghei36>
>>>
>>> --
>>> *From:* Srinivas V 
>>> *Sent:* Friday, June 26, 2020 9:47:52 PM
>>> *To:* Bryan Jeffrey 
>>> *Cc:* user 
>>> *Subject:* Re: Metrics Problem
>>>
>>> It should work when you are giving hdfs path as long as your jar exists
>>> in the path.
>>> Your error is more security issue (Kerberos) or Hadoop dependencies
>>> missing I think, your error says :
>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>> UserGroupInformation
>>>
>>> On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
>>> wrote:
>>>
>>> It may be helpful to note that I'm running in Yarn cluster mode.  My
>>> goal is to avoid having to manually distribute the JAR to all of the
>>> various nodes as this makes versioning deployments difficult.
>>>
>>> On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
>>> wrote:
>>>
>>> Hello.
>>>
>>> I am running Spark 2.4.4. I have implemented a custom metrics producer.
>>> It works well when I run locally, or specify the metrics producer only for
>>> the driver.  When I ask for executor metrics I run into
>>> ClassNotFoundExceptions
>>>
>>> *Is it possible to pass a metrics JAR via --jars?  If so what am I
>>> missing?*
>>>
>>> Deploy driver stats via:
>>> --jar

Re: Metrics Problem

2020-06-28 Thread Bryan Jeffrey
Srinivas,

Interestingly, I did have the metrics jar packaged as part of my main jar. It 
worked well both on driver and locally, but not on executors.

Regards,

Bryan Jeffrey

Get Outlook for Android<https://aka.ms/ghei36>


From: Srinivas V 
Sent: Saturday, June 27, 2020 1:23:24 AM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Metrics Problem

One option is to create your main jar included with metrics jar like a fat jar.

On Sat, Jun 27, 2020 at 8:04 AM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Srinivas,

Thanks for the insight. I had not considered a dependency issue as the metrics 
jar works well applied on the driver. Perhaps my main jar includes the Hadoop 
dependencies but the metrics jar does not?

I am confused as the only Hadoop dependency also exists for the built in 
metrics providers which appear to work.

Regards,

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Srinivas V mailto:srini@gmail.com>>
Sent: Friday, June 26, 2020 9:47:52 PM
To: Bryan Jeffrey mailto:bryan.jeff...@gmail.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Metrics Problem

It should work when you are giving hdfs path as long as your jar exists in the 
path.
Your error is more security issue (Kerberos) or Hadoop dependencies missing I 
think, your error says :
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation

On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
It may be helpful to note that I'm running in Yarn cluster mode.  My goal is to 
avoid having to manually distribute the JAR to all of the various nodes as this 
makes versioning deployments difficult.

On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Hello.

I am running Spark 2.4.4. I have implemented a custom metrics producer. It 
works well when I run locally, or specify the metrics producer only for the 
driver.  When I ask for executor metrics I run into ClassNotFoundExceptions

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Deploy driver stats via:
--jars hdfs:///custommetricsprovider.jar
--conf 
spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink

However, when I pass the JAR with the metrics provider to executors via:
--jars hdfs:///custommetricsprovider.jar
--conf 
spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink

I get ClassNotFoundException:

20/06/25 21:19:35 ERROR MetricsSystem: Sink class 
org.apache.spark.custommetricssink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.custommetricssink
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hado

Re: Metrics Problem

2020-06-26 Thread Bryan Jeffrey
Srinivas,

Thanks for the insight. I had not considered a dependency issue as the metrics 
jar works well applied on the driver. Perhaps my main jar includes the Hadoop 
dependencies but the metrics jar does not?

I am confused as the only Hadoop dependency also exists for the built in 
metrics providers which appear to work.

Regards,

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Srinivas V 
Sent: Friday, June 26, 2020 9:47:52 PM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Metrics Problem

It should work when you are giving hdfs path as long as your jar exists in the 
path.
Your error is more security issue (Kerberos) or Hadoop dependencies missing I 
think, your error says :
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation

On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
It may be helpful to note that I'm running in Yarn cluster mode.  My goal is to 
avoid having to manually distribute the JAR to all of the various nodes as this 
makes versioning deployments difficult.

On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Hello.

I am running Spark 2.4.4. I have implemented a custom metrics producer. It 
works well when I run locally, or specify the metrics producer only for the 
driver.  When I ask for executor metrics I run into ClassNotFoundExceptions

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Deploy driver stats via:
--jars hdfs:///custommetricsprovider.jar
--conf 
spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink

However, when I pass the JAR with the metrics provider to executors via:
--jars hdfs:///custommetricsprovider.jar
--conf 
spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink

I get ClassNotFoundException:

20/06/25 21:19:35 ERROR MetricsSystem: Sink class 
org.apache.spark.custommetricssink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.custommetricssink
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 4 more

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Thank you,

Bryan


Re: Metrics Problem

2020-06-26 Thread Bryan Jeffrey
It may be helpful to note that I'm running in Yarn cluster mode.  My goal
is to avoid having to manually distribute the JAR to all of the various
nodes as this makes versioning deployments difficult.

On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
wrote:

> Hello.
>
> I am running Spark 2.4.4. I have implemented a custom metrics producer. It
> works well when I run locally, or specify the metrics producer only for the
> driver.  When I ask for executor metrics I run into ClassNotFoundExceptions
>
> *Is it possible to pass a metrics JAR via --jars?  If so what am I
> missing?*
>
> Deploy driver stats via:
> --jars hdfs:///custommetricsprovider.jar
> --conf
> spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink
>
> However, when I pass the JAR with the metrics provider to executors via:
> --jars hdfs:///custommetricsprovider.jar
> --conf
> spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink
>
> I get ClassNotFoundException:
>
> 20/06/25 21:19:35 ERROR MetricsSystem: Sink class
> org.apache.spark.custommetricssink cannot be instantiated
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.custommetricssink
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
> at
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
> at
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> at
> org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
> at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
> at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> ... 4 more
>
> Is it possible to pass a metrics JAR via --jars?  If so what am I missing?
>
> Thank you,
>
> Bryan
>


Metrics Problem

2020-06-25 Thread Bryan Jeffrey
Hello.

I am running Spark 2.4.4. I have implemented a custom metrics producer. It
works well when I run locally, or specify the metrics producer only for the
driver.  When I ask for executor metrics I run into ClassNotFoundExceptions

*Is it possible to pass a metrics JAR via --jars?  If so what am I missing?*

Deploy driver stats via:
--jars hdfs:///custommetricsprovider.jar
--conf
spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink

However, when I pass the JAR with the metrics provider to executors via:
--jars hdfs:///custommetricsprovider.jar
--conf
spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink

I get ClassNotFoundException:

20/06/25 21:19:35 ERROR MetricsSystem: Sink class
org.apache.spark.custommetricssink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.custommetricssink
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at
org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 4 more

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Thank you,

Bryan


Custom Metrics

2020-06-18 Thread Bryan Jeffrey
Hello.

We're using Spark 2.4.4.  We have a custom metrics sink consuming the
Spark-produced metrics (e.g. heap free, etc.).  I am trying to determine a
good mechanism to pass the Spark application name into the metrics sink.
Current the application ID is included, but not the application name. Is
there a suggested mechanism?

Thank you,

Bryan Jeffrey


Data Source - State (SPARK-28190)

2020-03-30 Thread Bryan Jeffrey
Hi, Jungtaek.

We've been investigating the use of Spark Structured Streaming to replace
our Spark Streaming operations.  We have several cases where we're using
mapWithState to maintain state across batches, often with high volumes of
data.  We took a look at the Structured Streaming stateful processing.
Structured Streaming state processing looks great, but has some
shortcomings:
1. State can only be hydrated from checkpoint, which means that
modification of the state is not possible.
2. You cannot cleanup or normalize state data after it has been processed.

These shortcomings appear to be potentially addressed by your
ticket SPARK-28190 - "Data Source - State".  I see little activity on this
ticket. Can you help me to understand where this feature currently stands?

Thank you,

Bryan Jeffrey


Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-29 Thread Bryan Jeffrey
Jungtaek,

Thank you for taking a look at this issue. I would be happy to test your
patch - but I am unsure how to do so.  Can you help me to understand how I
can:
1. Obtain an updated version of the JAR in question either via a nightly
build or by building myself?
2. Substitute this JAR, ideally via changing a dependency in our code to
point to a local dependency (via Maven?)
3. Determine how I can substitute the JAR on the Spark cluster via changes
to our Maven dependency selection or less ideally by replacing the JAR on a
Spark cluster?

I would appreciate any pointers in this area as I'd like to come up to
speed on the right way to validate changes and perhaps contribute myself.

Regards,

Bryan Jeffrey

On Sat, Feb 29, 2020 at 9:52 AM Jungtaek Lim 
wrote:

> Forgot to mention - it only occurs the SQL type of UDT is having fixed
> length. If the UDT is used to represent complex type like array, struct, or
> even string, it doesn't trigger the issue. So that's like an edge-case and
> the chance of encountering this issue may not be that huge, and that's why
> this issue pops up now whereas the relevant code lives very long time.
>
> On Sat, Feb 29, 2020 at 11:44 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I've investigated a bit, and looks like it's not an issue of
>> mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It
>> seems to miss handling UDT and the missing spot makes the internal code of
>> Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)
>>
>> I've filed an issue (sorry I missed you've already filed an issue) and
>> submitted a patch. https://issues.apache.org/jira/browse/SPARK-30993
>>
>> It would be nice if you can try out my patch and see whether it fixes
>> your issue (I've already copied your code and made it pass, but would like
>> to double check). Thanks for reporting!
>>
>> On Sat, Feb 29, 2020 at 11:26 AM Bryan Jeffrey 
>> wrote:
>>
>>>
>>> Hi Tathagata.
>>>
>>> I tried making changes as you suggested:
>>>
>>> @SQLUserDefinedType(udt = classOf[JodaTimeUDT])
>>> class JodaTimeUDT extends UserDefinedType[DateTime] {
>>>   override def sqlType: DataType  = TimestampType
>>>
>>>   override def serialize(obj: DateTime): Long = {
>>> obj.getMillis
>>>   }
>>>
>>>   def deserialize(datum: Any): DateTime = {
>>> datum match {
>>>case value: Long => new DateTime(value, DateTimeZone.UTC)
>>> }
>>>   }
>>>
>>>   override def userClass: Class[DateTime] = classOf[DateTime]
>>>
>>>   private[spark] override def asNullable: JodaTimeUDT = this
>>> }
>>>
>>> object JodaTimeUDTRegister {
>>>   def register : Unit = {
>>> UDTRegistration.register(classOf[DateTime].getName, 
>>> classOf[JodaTimeUDT].getName)
>>>   }
>>> }
>>>
>>>
>>> This did not resolve the problem.  The results remain the same:
>>>
>>>
>>> org.scalatest.exceptions.TestFailedException: 
>>> Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1), 
>>> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same 
>>> elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), 
>>> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))
>>>
>>>
>>> I included a couple of other test cases to validate that the UDT works fine:
>>>
>>>
>>> "the joda time serializer" should "serialize and deserialize as expected" 
>>> in {
>>>   val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>>>   val serializer = new JodaTimeUDT()
>>>   val serialized = serializer.serialize(input)
>>>   val deserialized = serializer.deserialize(serialized)
>>>
>>>   deserialized should be(input)
>>> }
>>>
>>> it should "correctly implement dataframe serialization & deserialization in 
>>> data frames" in {
>>>   val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>>>   val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
>>>   val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
>>>   val sqlContext = session.sqlContext
>>>   import sqlContext.implicits._
>>>   val ds = input.toDF().as[FooWithDate]
>>>   val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, 
>>> Minutes(1)), x.s, x.i +

Fwd: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Hi Tathagata.

I tried making changes as you suggested:

@SQLUserDefinedType(udt = classOf[JodaTimeUDT])
class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = TimestampType

  override def serialize(obj: DateTime): Long = {
obj.getMillis
  }

  def deserialize(datum: Any): DateTime = {
datum match {
   case value: Long => new DateTime(value, DateTimeZone.UTC)
}
  }

  override def userClass: Class[DateTime] = classOf[DateTime]

  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = {
UDTRegistration.register(classOf[DateTime].getName,
classOf[JodaTimeUDT].getName)
  }
}


This did not resolve the problem.  The results remain the same:


org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1),
FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the
same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1),
FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))


I included a couple of other test cases to validate that the UDT works fine:


"the joda time serializer" should "serialize and deserialize as expected" in {
  val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
  val serializer = new JodaTimeUDT()
  val serialized = serializer.serialize(input)
  val deserialized = serializer.deserialize(serialized)

  deserialized should be(input)
}

it should "correctly implement dataframe serialization &
deserialization in data frames" in {
  val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
  val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
  val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
  val sqlContext = session.sqlContext
  import sqlContext.implicits._
  val ds = input.toDF().as[FooWithDate]
  val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date,
Minutes(1)), x.s, x.i + 1)).collect()
  val expected = List(FooWithDate(datePlusOne, "Foo", 2),
FooWithDate(datePlusOne, "Foo", 4))

  result should contain theSameElementsAs expected
}


Any other thoughts?


On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das 
wrote:

> Sounds like something to do with the serialization/deserialization, and
> not related to mapGroupsWithState.
>
>
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
>
> The docs says that
> 1. this is deprecated and therefore should not be used
> 2. you have to use the annotation `SQLUserDefinedType
> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
> on the class definition. You dont seem to have done it, maybe thats the
> reason?
>
> I would debug by printing the values in the serialize/deserialize methods,
> and then passing it through the groupBy that is known to fail.
>
> TD
>
> On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey 
> wrote:
>
>> Tathagata,
>>
>> The difference is more than hours off. In this instance it's different by
>> 4 years. In other instances it's different by tens of years (and other
>> smaller durations).
>>
>> We've considered moving to storage as longs, but this makes code much
>> less readable and harder to maintain. The udt serialization bug also causes
>> issues outside of stateful streaming, as when executing a simple group by.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>> --
>> *From:* Tathagata Das 
>> *Sent:* Friday, February 28, 2020 4:56:07 PM
>> *To:* Bryan Jeffrey 
>> *Cc:* user 
>> *Subject:* Re: Structured Streaming: mapGroupsWithState UDT
>> serialization does not work
>>
>> You are deserializing by explicitly specifying UTC timezone, but when
>> serializing you are not specifying it. Maybe that is reason?
>>
>> Also, if you can encode it using just long, then I recommend just saving
>> the value as long and eliminating some of the serialization overheads.
>> Spark will probably better optimize stuff if it sees it as a long rather
>> than an opaque UDT.
>>
>> TD
>>
>> On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
>> wrote:
>>
>> Hello.
>>
>> I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
>> mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
>> time in a number of data structures, and so we've generated a custom
>> serializer for Joda.  This works well in most dataset/dataframe structured
>> streaming 

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Perfect. I'll give this a shot and report back.

Get Outlook for Android<https://aka.ms/ghei36>


From: Tathagata Das 
Sent: Friday, February 28, 2020 6:23:07 PM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does 
not work

Sounds like something to do with the serialization/deserialization, and not 
related to mapGroupsWithState.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala

The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation 
`SQLUserDefinedType<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
 on the class definition. You dont seem to have done it, maybe thats the reason?

I would debug by printing the values in the serialize/deserialize methods, and 
then passing it through the groupBy that is known to fail.

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Tathagata,

The difference is more than hours off. In this instance it's different by 4 
years. In other instances it's different by tens of years (and other smaller 
durations).

We've considered moving to storage as longs, but this makes code much less 
readable and harder to maintain. The udt serialization bug also causes issues 
outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey

Get Outlook for Android<https://aka.ms/ghei36>


From: Tathagata Das 
mailto:tathagata.das1...@gmail.com>>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey mailto:bryan.jeff...@gmail.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does 
not work

You are deserializing by explicitly specifying UTC timezone, but when 
serializing you are not specifying it. Maybe that is reason?

Also, if you can encode it using just long, then I recommend just saving the 
value as long and eliminating some of the serialization overheads. Spark will 
probably better optimize stuff if it sees it as a long rather than an opaque 
UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Hello.

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with 
mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time 
in a number of data structures, and so we've generated a custom serializer for 
Joda.  This works well in most dataset/dataframe structured streaming 
operations. However, when running mapGroupsWithState we observed that incorrect 
dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an 
effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now 
has invalid date

Here is a simple repro:

Joda Time UDT:


private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = LongType
  override def serialize(obj: DateTime): Long = obj.getMillis
  def deserialize(datum: Any): DateTime = datum match { case value: Long => new 
DateTime(value, DateTimeZone.UTC) }
  override def userClass: Class[DateTime] = classOf[DateTime]
  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, 
classOf[JodaTimeUDT].getName)  }
}

Test Leveraging Joda UDT:


case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
BeforeAndAfterAll {
  val application = this.getClass.getName
  var session: SparkSession = _

  override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.testing", "true")
  .set("spark.memory.fraction", "1")
  .set("spark.ui.enabled", "false")
  .set("spark.streaming.gracefulStopTimeout", "1000")
  .setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
  }

  override def afterAll(): Unit = {
session.stop()
  }

  it should "work correctly for a streaming input with stateful transformati

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Tathagata,

The difference is more than hours off. In this instance it's different by 4 
years. In other instances it's different by tens of years (and other smaller 
durations).

We've considered moving to storage as longs, but this makes code much less 
readable and harder to maintain. The udt serialization bug also causes issues 
outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey

Get Outlook for Android<https://aka.ms/ghei36>


From: Tathagata Das 
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does 
not work

You are deserializing by explicitly specifying UTC timezone, but when 
serializing you are not specifying it. Maybe that is reason?

Also, if you can encode it using just long, then I recommend just saving the 
value as long and eliminating some of the serialization overheads. Spark will 
probably better optimize stuff if it sees it as a long rather than an opaque 
UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
mailto:bryan.jeff...@gmail.com>> wrote:
Hello.

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with 
mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time 
in a number of data structures, and so we've generated a custom serializer for 
Joda.  This works well in most dataset/dataframe structured streaming 
operations. However, when running mapGroupsWithState we observed that incorrect 
dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an 
effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now 
has invalid date

Here is a simple repro:

Joda Time UDT:


private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = LongType
  override def serialize(obj: DateTime): Long = obj.getMillis
  def deserialize(datum: Any): DateTime = datum match { case value: Long => new 
DateTime(value, DateTimeZone.UTC) }
  override def userClass: Class[DateTime] = classOf[DateTime]
  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, 
classOf[JodaTimeUDT].getName)  }
}

Test Leveraging Joda UDT:


case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
BeforeAndAfterAll {
  val application = this.getClass.getName
  var session: SparkSession = _

  override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.testing", "true")
  .set("spark.memory.fraction", "1")
  .set("spark.ui.enabled", "false")
  .set("spark.streaming.gracefulStopTimeout", "1000")
  .setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
  }

  override def afterAll(): Unit = {
session.stop()
  }

  it should "work correctly for a streaming input with stateful transformation" 
in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), 
FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new 
MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], 
GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
  .groupByKey(x => x.i)
  
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"


result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * 
from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, 
"FooFoo", 6))
combine

Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Hello.

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
time in a number of data structures, and so we've generated a custom
serializer for Joda.  This works well in most dataset/dataframe structured
streaming operations. However, when running mapGroupsWithState we observed
that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in
an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state
now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = LongType
  override def serialize(obj: DateTime): Long = obj.getMillis
  def deserialize(datum: Any): DateTime = datum match { case value:
Long => new DateTime(value, DateTimeZone.UTC) }
  override def userClass: Class[DateTime] = classOf[DateTime]
  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = {
UDTRegistration.register(classOf[DateTime].getName,
classOf[JodaTimeUDT].getName)  }
}


Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory
with BeforeAndAfterAll {
  val application = this.getClass.getName
  var session: SparkSession = _

  override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.testing", "true")
  .set("spark.memory.fraction", "1")
  .set("spark.ui.enabled", "false")
  .set("spark.streaming.gracefulStopTimeout", "1000")
  .setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
  }

  override def afterAll(): Unit = {
session.stop()
  }

  it should "work correctly for a streaming input with stateful
transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date,
"Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new
MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate],
GroupState[FooWithDate]) => FooWithDate =
TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
  .groupByKey(x => x.i)
  
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"


result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText =
s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1),
FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
  }
}

object TestJodaTimeUdt {
  def updateFooState(id: Int, inputs: Iterator[FooWithDate], state:
GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
  state.remove()
  state.getOption.get
} else {
  val inputsSeq: Seq[FooWithDate] = inputs.toSeq
  val startingState = state.getOption.getOrElse(inputsSeq.head)
  val toProcess = if (state.getOption.isDefined) inputsSeq else
inputsSeq.tail
  val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

  state.update(updatedFoo)
  state.setTimeoutDuration("1 minute")
  updatedFoo
}
  }

  def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate =
FooWithDate(b.date, a.s + b.s, a.i + b.i)
}


The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1),
FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same
elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1),
FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey


Accumulator v2

2020-01-21 Thread Bryan Jeffrey
Hello.

We're currently using Spark streaming (Spark 2.3) for a number of
applications. One pattern we've used successfully is to generate an
accumulator inside a DStream transform statement.  We then accumulate
values associated with the RDD as we process the data.  A stage completion
listener that listens for stage complete events, retrieves the
AccumulableInfo for our custom classes and exhausts the statistics to our
back-end.

We're trying to move more of our applications to using Structured
Streaming.  However, the accumulator pattern does not seem to obviously fit
Structured Streaming.  In many cases we're able to see basic statistics
(e.g. # input and # output events) from the built-in statistics.  We need
to determine a pattern for more complex statistics (# errors, # of internal
records, etc).  Defining an accumulator on startup and adding statistics,
we're able to see the statistics - but only updates - so if we read 10
records in the first trigger, and 15 in the second trigger we see
accumulated values of 10, 25.

There are several options that might allow us to move ahead:
1. We could have the AccumulableInfo contain previous counts and current
counts
2. We could maintain current and previous counts separately
3. We could maintain a list of ID to AccumulatorV2 and then call
accumulator.reset() once we've read data

All of these options seem a little bit like a hacky workaround.  Has anyone
encountered this use-case?  Is there a good pattern to follow?

Regards,

Bryan Jeffrey


Structured Streaming & Enrichment Broadcasts

2019-11-18 Thread Bryan Jeffrey
Hello.

We're running applications using Spark Streaming.  We're going to begin
work to move to using Structured Streaming.  One of our key scenarios is to
lookup values from an external data source for each record in an incoming
stream.  In Spark Streaming we currently read the external data, broadcast
it and then lookup the value from the broadcast.  The broadcast value is
refreshed on a periodic basis - with the need to refresh evaluated on each
batch (in a foreachRDD).  The broadcasts are somewhat large (~1M records).
Each stream we're doing the lookup(s) for is ~6M records / second.

While we could conceivably continue this pattern in Structured Streaming
with Spark 2.4.x and the 'foreachBatch', based on my read of documentation
this seems like a bit of an anti-pattern in Structured Streaming.

So I am looking for advice: What mechanism would you suggest to on a
periodic basis read an external data source and do a fast lookup for a
streaming input.  One option appears to be to do a broadcast left outer
join?  In the past this mechanism has been less easy to performance tune
than doing an explicit broadcast and lookup.

Regards,

Bryan Jeffrey


Driver - Stops Scheduling Streaming Jobs

2019-08-27 Thread Bryan Jeffrey
Hello!

We're running Spark 2.3.0 on Scala 2.11.  We have a number of Spark
Streaming jobs that are using MapWithState.  We've observed that these jobs
will complete some set of stages, and then not schedule the next set of
stages.  It looks like the DAG Scheduler correctly identifies required
stages:

19/08/27 15:29:48 INFO YarnClusterScheduler: Removed TaskSet 79.0, whose
tasks have all completed, from pool
19/08/27 15:29:48 INFO DAGScheduler: ShuffleMapStage 79 (map at
SomeCode.scala:121) finished in 142.985 s
19/08/27 15:29:48 INFO DAGScheduler: looking for newly runnable stages
19/08/27 15:29:48 INFO DAGScheduler: running: Set()
19/08/27 15:29:48 INFO DAGScheduler: waiting: Set(ShuffleMapStage 81,
ResultStage 82, ResultStage 83, ShuffleMapStage 54, ResultStage 61,
ResultStage 55, ShuffleMapStage 48, ShuffleMapStage 84, Result
Stage 49, ShuffleMapStage 85, ShuffleMapStage 56, ResultStage 86,
ShuffleMapStage 57, ResultStage 58, ResultStage 80)
19/08/27 15:29:48 INFO DAGScheduler: failed: Set()

However, we see no stages that begin execution.  This happens semi-rarely
(every couple of days), which makes repro difficult.  I checked known bugs
fixed in 2.3.x and did not see anything pop out.  Has anyone else seen this
behavior? Any thoughts on debugging?

Regards,

Bryan Jeffrey


Driver OOM does not shut down Spark Context

2019-01-31 Thread Bryan Jeffrey
Hello.

I am running Spark 2.3.0 via Yarn.  I have a Spark Streaming application
where the driver threw an uncaught out of memory exception:

19/01/31 13:00:59 ERROR Utils: Uncaught exception in thread
element-tracking-store-worker
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.spark.util.kvstore.KVTypeInfo$MethodAccessor.get(KVTypeInfo.java:154)
at
org.apache.spark.util.kvstore.InMemoryStore$InMemoryView.compare(InMemoryStore.java:248)
at
org.apache.spark.util.kvstore.InMemoryStore$InMemoryView.lambda$iterator$0(InMemoryStore.java:203)
at
org.apache.spark.util.kvstore.InMemoryStore$InMemoryView$$Lambda$27/1691147907.compare(Unknown
Source)
at java.util.TimSort.binarySort(TimSort.java:296)
at java.util.TimSort.sort(TimSort.java:239)
at java.util.Arrays.sort(Arrays.java:1512)
at java.util.ArrayList.sort(ArrayList.java:1462)
at java.util.Collections.sort(Collections.java:175)
at
org.apache.spark.util.kvstore.InMemoryStore$InMemoryView.iterator(InMemoryStore.java:203)
at
scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
org.apache.spark.status.AppStatusListener$$anonfun$org$apache$spark$status$AppStatusListener$$cleanupStages$1.apply(AppStatusListener.scala:894)
at
org.apache.spark.status.AppStatusListener$$anonfun$org$apache$spark$status$AppStatusListener$$cleanupStages$1.apply(AppStatusListener.scala:874)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.status.AppStatusListener.org
$apache$spark$status$AppStatusListener$$cleanupStages(AppStatusListener.scala:874)
at
org.apache.spark.status.AppStatusListener$$anonfun$3.apply$mcVJ$sp(AppStatusListener.scala:84)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1$$anonfun$apply$mcV$sp$1.apply(ElementTrackingStore.scala:109)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1$$anonfun$apply$mcV$sp$1.apply(ElementTrackingStore.scala:107)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(ElementTrackingStore.scala:107)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1.apply(ElementTrackingStore.scala:105)
at
org.apache.spark.status.ElementTrackingStore$$anonfun$write$1$$anonfun$apply$1.apply(ElementTrackingStore.scala:105)
at org.apache.spark.util.Utils$.tryLog(Utils.scala:2001)
at
org.apache.spark.status.ElementTrackingStore$$anon$1.run(ElementTrackingStore.scala:91)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Despite the uncaught exception the Streaming application never terminated.
No new batches were started.  As a result my job did not process data for
some period of time (until our ancillary monitoring noticed the issue).

*Ask: What can we do to ensure that the driver is shut down when this type
of exception occurs?*

Regards,

Bryan Jeffrey


Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-31 Thread Bryan Jeffrey
Cody,

Yes - I was able to verify that I am not seeing duplicate calls to
createDirectStream.  If the spark-streaming-kafka-0-10 will work on a 2.3
cluster I can go ahead and give that a shot.

Regards,

Bryan Jeffrey

On Fri, Aug 31, 2018 at 11:56 AM Cody Koeninger  wrote:

> Just to be 100% sure, when you're logging the group id in
> createDirectStream, you no longer see any duplicates?
>
> Regarding testing master, is the blocker that your spark cluster is on
> 2.3?  There's at least a reasonable chance that building an
> application assembly jar that uses the master version just for the
> spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster
>
> On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey 
> wrote:
> > Cody,
> >
> > We are connecting to multiple clusters for each topic.  I did experiment
> > this morning with both adding a cluster identifier to the group id, as
> well
> > as simply moving to use only a single one of our clusters.  Neither of
> these
> > were successful.  I am not able to run a test against master now.
> >
> > Regards,
> >
> > Bryan Jeffrey
> >
> >
> >
> >
> > On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger 
> wrote:
> >>
> >> I doubt that fix will get backported to 2.3.x
> >>
> >> Are you able to test against master?  2.4 with the fix you linked to
> >> is likely to hit code freeze soon.
> >>
> >> From a quick look at your code, I'm not sure why you're mapping over
> >> an array of brokers.  It seems like that would result in different
> >> streams with the same group id, because broker isn't part of your
> >> group id string.
> >>
> >> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <
> bryan.jeff...@gmail.com>
> >> wrote:
> >> > Hello, Spark Users.
> >> >
> >> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
> >> > We're
> >> > have a Spark streaming job, and we're reading a reasonable amount of
> >> > data
> >> > from Kafka (40 GB / minute or so).  We would like to move to using the
> >> > Kafka
> >> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from
> having
> >> > to
> >> > modify formats.
> >> >
> >> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> >> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
> >> > tried to
> >> > work around it as follows:
> >> >
> >> > 1. Disabled consumer caching.  This increased the total job time from
> ~1
> >> > minute per batch to ~1.8 minutes per batch.  This performance penalty
> is
> >> > unacceptable for our use-case. We also saw some partitions stop
> >> > receiving
> >> > for an extended period of time - I was unable to get a simple repro
> for
> >> > this
> >> > effect though.
> >> > 2. Disabled speculation and multiple-job concurrency and added caching
> >> > for
> >> > the stream directly after reading from Kafka & caching offsets.  This
> >> > approach seems to work well for simple examples (read from a Kafka
> >> > topic,
> >> > write to another topic). However, when we move through more complex
> >> > logic we
> >> > continue to see this type of error - despite only creating the stream
> >> > for a
> >> > given topic a single time.  We validated that we're creating the
> stream
> >> > from
> >> > a given topic / partition a single time by logging on stream creation,
> >> > caching the stream and (eventually) calling 'runJob' to actually go
> and
> >> > fetch the data. Nonetheless with multiple outputs we see the
> >> > ConcurrentModificationException.
> >> >
> >> > I've included some code down below.  I would be happy if anyone had
> >> > debugging tips for the workaround.  However, my main concern is to
> >> > ensure
> >> > that the 2.4 version will have a bug fix that will work for Spark
> >> > Streaming
> >> > in which multiple input topics map data to multiple outputs. I would
> >> > also
> >> > like to understand if the fix
> >> > (https://github.com/apache/spark/pull/20997)
> >> > will be backported to Spark 2.3.x
> >> >
> >> > In our code, read looks like the following:
>

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-31 Thread Bryan Jeffrey
Cody,

We are connecting to multiple clusters for each topic.  I did experiment
this morning with both adding a cluster identifier to the group id, as well
as simply moving to use only a single one of our clusters.  Neither of
these were successful.  I am not able to run a test against master now.

Regards,

Bryan Jeffrey




On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger  wrote:

> I doubt that fix will get backported to 2.3.x
>
> Are you able to test against master?  2.4 with the fix you linked to
> is likely to hit code freeze soon.
>
> From a quick look at your code, I'm not sure why you're mapping over
> an array of brokers.  It seems like that would result in different
> streams with the same group id, because broker isn't part of your
> group id string.
>
> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey 
> wrote:
> > Hello, Spark Users.
> >
> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> > have a Spark streaming job, and we're reading a reasonable amount of data
> > from Kafka (40 GB / minute or so).  We would like to move to using the
> Kafka
> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
> to
> > modify formats.
> >
> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
> tried to
> > work around it as follows:
> >
> > 1. Disabled consumer caching.  This increased the total job time from ~1
> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
> > unacceptable for our use-case. We also saw some partitions stop receiving
> > for an extended period of time - I was unable to get a simple repro for
> this
> > effect though.
> > 2. Disabled speculation and multiple-job concurrency and added caching
> for
> > the stream directly after reading from Kafka & caching offsets.  This
> > approach seems to work well for simple examples (read from a Kafka topic,
> > write to another topic). However, when we move through more complex
> logic we
> > continue to see this type of error - despite only creating the stream
> for a
> > given topic a single time.  We validated that we're creating the stream
> from
> > a given topic / partition a single time by logging on stream creation,
> > caching the stream and (eventually) calling 'runJob' to actually go and
> > fetch the data. Nonetheless with multiple outputs we see the
> > ConcurrentModificationException.
> >
> > I've included some code down below.  I would be happy if anyone had
> > debugging tips for the workaround.  However, my main concern is to ensure
> > that the 2.4 version will have a bug fix that will work for Spark
> Streaming
> > in which multiple input topics map data to multiple outputs. I would also
> > like to understand if the fix (
> https://github.com/apache/spark/pull/20997)
> > will be backported to Spark 2.3.x
> >
> > In our code, read looks like the following:
> >
> > case class StreamLookupKey(topic: Set[String], brokers: String)
> >
> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
> >
> > // Given inputs return a direct stream.
> > def createDirectStream(ssc: StreamingContext,
> >additionalKafkaParameters: Map[String, String],
> >brokersToUse: Array[String], //
> > broker1,broker2|broker3,broker4
> >topicsToUse: Array[String],
> >applicationName: String,
> >persist: Option[PersistenceManager],
> >useOldestOffsets: Boolean,
> >maxRatePerPartition: Long,
> >batchSeconds: Int
> >   ): DStream[DecodedData] = {
> >   val streams: Array[DStream[DecodedData]] =
> > brokersToUse.map(brokers => {
> >   val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
> >   val kafkaParameters: Map[String, String] =
> getKafkaParameters(brokers,
> > useOldestOffsets, groupId) ++ additionalKafkaParameters
> >   logger.info(s"Kafka Params: ${kafkaParameters}")
> >   val topics = topicsToUse.toSet
> >   logger.info(s"Creating Kafka direct connection -
> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
> > s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> > applicationGroup: ${groupId}")
> >
> >   streamMap.getOrEls

ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-30 Thread Bryan Jeffrey
)
  }
}

def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
applicationName: String): Map[String, String] =
  Map[String, String](
"auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
"enable.auto.commit" -> false.toString, // we'll commit these manually
"key.deserializer" ->
classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[Decoders.MixedDecoder].getCanonicalName,
"partition.assignment.strategy" ->
classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
"bootstrap.servers" -> brokers,
"group.id" -> applicationName,
"session.timeout.ms" -> 24.toString,
"request.timeout.ms"-> 30.toString
  )

Write code looks like the following:

def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String],
conv: (T) => Array[Byte], numPartitions: Int): Unit = {
  val rddToWrite =
if (numPartitions > 0) {
  rdd.repartition(numPartitions)
} else {
  rdd
}

  // Get session from current threads session
  val session = SparkSession.builder().getOrCreate()
  val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
StructType(Array(StructField("value", BinaryType
  df.selectExpr("CAST('' AS STRING)", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", getBrokersToUse(brokers))
.option("compression.type", "gzip")
.option("retries", "3")
.option("topic", topic)
.save()
}

Regards,

Bryan Jeffrey


Heap Memory in Spark 2.3.0

2018-07-16 Thread Bryan Jeffrey
6.0K->0.0B Heap:
8650.3M(20.0G)->7931.0M(20.0G)], [Metaspace: 52542K->52542K(1095680K)]
 [Times: user=20.80 sys=1.53, real=17.48 secs]
812.433: [Full GC (Allocation Failure) 817.544: [SoftReference, 112
refs, 0.858 secs]817.544: [WeakReference, 1606 refs, 0.0002380
secs]817.545: [FinalReference, 1175 refs, 0.0003358 secs]817.545:
[PhantomReference, 0 refs, 12 refs, 0.135 secs]817.545: [JNI Weak
Reference, 0.305 secs] 7931M->7930M(20G), 15.9899953 secs]
   [Eden: 0.0B(9588.0M)->0.0B(9588.0M) Survivors: 0.0B->0.0B Heap:
7931.0M(20.0G)->7930.8M(20.0G)], [Metaspace: 52542K->52477K(1095680K)]
 [Times: user=20.88 sys=0.87, real=15.99 secs]
828.425: [GC concurrent-mark-abort]
Heap
 garbage-first heap   total 20971520K, used 8465150K
[0x0002c000, 0x0002c040a000, 0x0007c000)
  region size 4096K, 2 young (8192K), 0 survivors (0K)
 Metaspace   used 52477K, capacity 52818K, committed 54400K,
reserved 1095680K
  class spaceused 7173K, capacity 7280K, committed 7552K, reserved 1048576K
 Concurrent marking:
  0   init marks: total time = 0.00 s (avg = 0.00 ms).
 13  remarks: total time = 0.49 s (avg =37.77 ms).
   [std. dev =21.54 ms, max =83.71 ms]
13  final marks: total time = 0.03 s (avg = 2.00 ms).
  [std. dev = 2.89 ms, max =11.95 ms]
13weak refs: total time = 0.46 s (avg =35.77 ms).
  [std. dev =22.02 ms, max =82.16 ms]
 13 cleanups: total time = 0.14 s (avg =10.63 ms).
   [std. dev =11.13 ms, max =42.87 ms]
Final counting total time = 0.04 s (avg = 2.96 ms).
RS scrub total time = 0.05 s (avg = 3.91 ms).
  Total stop_world time = 0.63 s.
  Total concurrent time =   195.51 s (  194.00 s marking).


This appears to show that we're seeing allocated heap of 7930 MB out
of 2 MB (so about half).  However, Spark is throwing a Java OOM (out of
heap space) error.  I validated that we're not using legacy memory
management mode.  I've tried this against a few applications (some batch,
some streaming).  Has something changed with memory allocation in Spark
2.3.0 that would cause these issues?

Thank you for any help you can provide.

Regards,

Bryan Jeffrey


Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Thank you. Let me see if I can reproduce this. We're not seeing offsets load 
correctly on startup - but perhaps there is an error on my side.

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 5:01:01 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

Offsets are loaded when you instantiate an
org.apache.kafka.clients.consumer.KafkaConsumer, subscribe, and poll.
There's not an explicit api for it.  Have you looked at the output of
kafka-consumer-groups.sh and tried the example code I linked to?


bash-3.2$ ./bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group commitexample --describe
Note: This will only show information about consumers that use the
Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'commitexample' has no active members.
TOPIC  PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAGCONSUMER-ID
  HOST   CLIENT-ID
test   0  10561656
   600- -


scala> val c = new KafkaConsumer[String, String](kafkaParams.asJava)
c: org.apache.kafka.clients.consumer.KafkaConsumer[String,String] =
org.apache.kafka.clients.consumer.KafkaConsumer@780cbdf8
scala> c.subscribe(java.util.Arrays.asList("test"))
scala> c.poll(0)
scala> c.position(new TopicPartition("test", 0))
res4: Long = 1056






On Thu, Jun 14, 2018 at 3:33 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Where is that called in the driver? The only call I see from Subscribe is to
> load the offset from checkpoint.
>
> Get Outlook for Android
>
> ____
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:24:58 PM
>
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The code that loads offsets from kafka is in e.g.
> org.apache.kafka.clients.consumer, it's not in spark.
>
> On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey 
> wrote:
>> Cody,
>>
>> Can you point me to the code that loads offsets? As far as I can see with
>> Spark 2.1, the only offset load is from checkpoint.
>>
>> Thank you!
>>
>> Bryan
>>
>> Get Outlook for Android
>>
>> 
>> From: Cody Koeninger 
>> Sent: Thursday, June 14, 2018 4:00:31 PM
>> To: Bryan Jeffrey
>> Cc: user
>> Subject: Re: Kafka Offset Storage: Fetching Offsets
>>
>> The expectation is that you shouldn't have to manually load offsets
>> from kafka, because the underlying kafka consumer on the driver will
>> start at the offsets associated with the given group id.
>>
>> That's the behavior I see with this example:
>>
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>>
>> What does bin/kafka-consumer-groups.sh show for your group id?
>>
>> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
>> wrote:
>>> Hello.
>>>
>>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>>> on
>>> the documentation
>>>
>>>
>>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>>> it appears that you can now use Kafka itself to store offsets.
>>>
>>> I've setup a  simple Kafka DStream:
>>> val kafkaParameters = Map[String, String](
>>>   "metadata.broker.list" -> brokers,
>>>   "auto.offset.reset" -> "latest",
>>>   "enable.auto.commit" -> false.toString,
>>>   "key.deserializer" ->
>>>
>>>
>>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>>   "partition.assignment.strategy" ->
>>>
>>>
>>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>>   "group.id" -> applicationName
>>> )
>>>
>>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>>> DecodedData](topics.toSeq, kafkaParameters)
>>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>>
>>>
>>> I then commit the offsets:
>>>
>>&

Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Where is that called in the driver? The only call I see from Subscribe is to 
load the offset from checkpoint.

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 4:24:58 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

The code that loads offsets from kafka is in e.g.
org.apache.kafka.clients.consumer, it's not in spark.

On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Can you point me to the code that loads offsets? As far as I can see with
> Spark 2.1, the only offset load is from checkpoint.
>
> Thank you!
>
> Bryan
>
> Get Outlook for Android
>
> 
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:00:31 PM
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The expectation is that you shouldn't have to manually load offsets
> from kafka, because the underlying kafka consumer on the driver will
> start at the offsets associated with the given group id.
>
> That's the behavior I see with this example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>
> What does bin/kafka-consumer-groups.sh show for your group id?
>
> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
> wrote:
>> Hello.
>>
>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>> on
>> the documentation
>>
>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>> it appears that you can now use Kafka itself to store offsets.
>>
>> I've setup a  simple Kafka DStream:
>> val kafkaParameters = Map[String, String](
>>   "metadata.broker.list" -> brokers,
>>   "auto.offset.reset" -> "latest",
>>   "enable.auto.commit" -> false.toString,
>>   "key.deserializer" ->
>>
>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>   "partition.assignment.strategy" ->
>>
>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>   "group.id" -> applicationName
>> )
>>
>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> DecodedData](topics.toSeq, kafkaParameters)
>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>
>>
>> I then commit the offsets:
>>
>> var offsets: Array[OffsetRange] = Array()
>> stream.foreachRDD(rdd => {
>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>> })
>>
>> // Future: Move this after we've done processing.
>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>
>> The offsets appear to commit successfully. However, on restart the
>> streaming
>> application consistently starts from latest whenever the Spark checkpoint
>> is
>> changed.  Drilling into the code it does not appear that re-loading offset
>> data is supported in the Spark Streaming Kafka library.  How is this
>> expected to work?  Is there an example of saving the offsets to Kafka and
>> then loading them from Kafka?
>>
>> Regards,
>>
>> Bryan Jeffrey


Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Cody,

Can you point me to the code that loads offsets? As far as I can see with Spark 
2.1, the only offset load is from checkpoint.

Thank you!

Bryan

Get Outlook for Android<https://aka.ms/ghei36>


From: Cody Koeninger 
Sent: Thursday, June 14, 2018 4:00:31 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

The expectation is that you shouldn't have to manually load offsets
from kafka, because the underlying kafka consumer on the driver will
start at the offsets associated with the given group id.

That's the behavior I see with this example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala

What does bin/kafka-consumer-groups.sh show for your group id?

On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey  wrote:
> Hello.
>
> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based on
> the documentation
> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
> it appears that you can now use Kafka itself to store offsets.
>
> I've setup a  simple Kafka DStream:
> val kafkaParameters = Map[String, String](
>   "metadata.broker.list" -> brokers,
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> false.toString,
>   "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>   "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>   "bootstrap.servers" -> brokersToUse.mkString(","),
>   "group.id" -> applicationName
> )
>
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
> KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>
>
> I then commit the offsets:
>
> var offsets: Array[OffsetRange] = Array()
> stream.foreachRDD(rdd => {
>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   logger.info(s"Offsets: ${offsets.mkString("|")}")
> })
>
> // Future: Move this after we've done processing.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>
> The offsets appear to commit successfully. However, on restart the streaming
> application consistently starts from latest whenever the Spark checkpoint is
> changed.  Drilling into the code it does not appear that re-loading offset
> data is supported in the Spark Streaming Kafka library.  How is this
> expected to work?  Is there an example of saving the offsets to Kafka and
> then loading them from Kafka?
>
> Regards,
>
> Bryan Jeffrey


Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Bryan Jeffrey
Hello.

I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
on the documentation (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
it appears that you can now use Kafka itself to store offsets.

I've setup a  simple Kafka DStream:
val kafkaParameters = Map[String, String](
  "metadata.broker.list" -> brokers,
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> false.toString,
  "key.deserializer" ->
classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
  "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
  "partition.assignment.strategy" ->
classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
  "bootstrap.servers" -> brokersToUse.mkString(","),
  "group.id" -> applicationName
)

val consumerStrategy = ConsumerStrategies.Subscribe[String,
DecodedData](topics.toSeq, kafkaParameters)
KafkaUtils.createDirectStream(ssc, locationStrategy =
LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)


I then commit the offsets:

var offsets: Array[OffsetRange] = Array()
stream.foreachRDD(rdd => {
  offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  logger.info(s"Offsets: ${offsets.mkString("|")}")
})

// Future: Move this after we've done processing.
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)

The offsets appear to commit successfully. However, on restart the
streaming application consistently starts from latest whenever the Spark
checkpoint is changed.  Drilling into the code it does not appear that
re-loading offset data is supported in the Spark Streaming Kafka library.
How is this expected to work?  Is there an example of saving the offsets to
Kafka and then loading them from Kafka?

Regards,

Bryan Jeffrey


Re: [Spark 2.x Core] Adding to ArrayList inside rdd.foreach()

2018-04-07 Thread Bryan Jeffrey
You can just call rdd.flatMap(_._2).collect


Get Outlook for Android


From: klrmowse 
Sent: Saturday, April 7, 2018 1:29:34 PM
To: user@spark.apache.org
Subject: Re: [Spark 2.x Core] Adding to ArrayList inside rdd.foreach()

okie, well...

i'm working with a pair rdd 

i need to extract the values and store them somehow (maybe a simple
Array??), which i later parallelize and reuse

since adding to a list is a no-no, what, if any, are the other options?
(Java Spark, btw)



thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Return statements aren't allowed in Spark closures

2018-02-21 Thread Bryan Jeffrey
Lian,

You're writing Scala. Just remove the 'return'. No need for it in Scala.

Get Outlook for Android


From: Lian Jiang 
Sent: Wednesday, February 21, 2018 4:16:08 PM
To: user
Subject: Return statements aren't allowed in Spark closures

I can run below code in spark-shell using yarn client mode.


val csv = spark.read.option("header", "true").csv("my.csv")


def queryYahoo(row: Row) : Int = { return 10; }


csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => { queryYahoo(r) })}

However, the same code failed when run using spark-submit in yarn client or 
cluster mode due to error:


18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception: 
org.apache.spark.util.ReturnStatementInClosureException: Return statements 
aren't allowed in Spark closures

org.apache.spark.util.ReturnStatementInClosureException: Return statements 
aren't allowed in Spark closures

at 
org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:371)

at org.apache.xbean.asm5.ClassReader.a(Unknown Source)

at org.apache.xbean.asm5.ClassReader.b(Unknown Source)

at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)

at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)

at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:243)

at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)

at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)

at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)

at scala.collection.immutable.List.foreach(List.scala:381)

at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:292)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)


Any idea? Thanks.


Stopping a Spark Streaming Context gracefully

2017-11-07 Thread Bryan Jeffrey
Hello.

I am running Spark 2.1, Scala 2.11.  We're running several Spark streaming
jobs.  In some cases we restart these jobs on an occasional basis.  We have
code that looks like the following:

logger.info("Starting the streaming context!")
ssc.start()
logger.info("Waiting for termination!")
Option(config.getInt(Parameters.RestartMinutes)).getOrElse(0) match {
  case restartMinutes: Int if restartMinutes > 0 =>
logger.info(s"Waiting for ${restartMinutes} before terminating job")
ssc.awaitTerminationOrTimeout(restartMinutes *
DateUtils.millisecondsPerMinute)
  case _ => ssc.awaitTermination()
}
logger.info("Calling 'stop'")
ssc.stop(stopSparkContext = true, stopGracefully = true)


In several cases we've observed jobs where we've called 'stop' not
stopping.  I went and wrote a simple job that reads from Kafka and does
nothing (prints a count of data).  After several minutes it simply calls
'ssc.stop(true, true)'.  In some cases this will stop the context.  In
others it will not stop the context.  If we call 'stop' several times over
an interval one of them eventually succeeds.

It looks like this is a bug.  I looked in Jira and did not see an open
issue.  Is this a  known problem?  If not I'll open a bug.

Regards,

Bryan Jeffrey


Re: about broadcast join of base table in spark sql

2017-06-30 Thread Bryan Jeffrey
Hello. 




If you want to allow broadcast join with larger broadcasts you can set 
spark.sql.autoBroadcastJoinThreshold to a higher value. This will cause the 
plan to allow join despite 'A' being larger than the default threshold. 




Get Outlook for Android







From: paleyl


Sent: Wednesday, June 28, 10:42 PM


Subject: about broadcast join of base table in spark sql


To: d...@spark.org, user@spark.apache.org






Hi All,






Recently I meet a problem in broadcast join: I want to left join table A and B, 
A is the smaller one and the left table, so I wrote 




A = A.join(B,A("key1") === B("key2"),"left")




but I found that A is not broadcast out, as the shuffle size is still very 
large.




I guess this is a designed mechanism in spark, so could anyone please tell me 
why it is designed like this? I am just very curious.






Best,






Paley 










Re: Question about Parallel Stages in Spark

2017-06-27 Thread Bryan Jeffrey
Satish, 




Is this two separate applications submitted to the Yarn scheduler? If so then 
you would expect that you would see the original case run in parallel. 




However, if this is one application your submission to Yarn guarantees that 
this application will fairly  contend with resources requested by other 
applications. However, the internal output operations within your application 
(jobs) will be scheduled by the driver (running on a single AM). This means 
that whatever driver options and code you've set will impact the application, 
but the Yarn scheduler will not impact (beyond allocating cores, memory, etc. 
between applications.)








Get Outlook for Android







On Tue, Jun 27, 2017 at 2:33 AM -0400, "satish lalam"  
wrote:










Thanks All. To reiterate - stages inside a job can be run parallely as long as 
- (a) there is no sequential dependency (b) the job has sufficient resources. 
however, my code was launching 2 jobs and they are sequential as you rightly 
pointed out.The issue which I was trying to highlight with that piece of 
pseudocode however was that - I am observing a job with 2 stages which dont 
depend on each other (they both are reading data from 2 seperate tables in db), 
they both are scheduled and both stages get resources - but the 2nd stage 
really does not pick up until the 1st stage is complete. It might be due to the 
db driver - I will post it to the right forum. Thanks.
On Mon, Jun 26, 2017 at 9:12 PM, Pralabh Kumar  wrote:
i think my words also misunderstood. My point is they will not submit together 
since they are the part of one thread.  
val spark =  SparkSession.builder()
  .appName("practice")
  .config("spark.scheduler.mode","FAIR")
  .enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
Thread.sleep(1000)
I ran this and both spark submit time are different for both the jobs .
Please let me if I am wrong
On Tue, Jun 27, 2017 at 9:17 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
My words cause misunderstanding.Step 1:A is submited to spark.Step 2:B is 
submitted to spark.
Spark gets two independent jobs.The FAIR  is used to schedule A and B.
Jeffrey' code did not cause two submit.


 ---Original---From: "Pralabh Kumar"Date: 2017/6/27 
12:09:27To: "萝卜丝炒饭"<1427357...@qq.com>;Cc: 
"user";"satishl";"Bryan 
Jeffrey";Subject: Re: Question about Parallel Stages 
in Spark
Hi 
I don't think so spark submit ,will receive two submits .  Its will execute one 
submit and then to next one .  If the application is multithreaded ,and two 
threads are calling spark submit and one time , then they will run parallel 
provided the scheduler is FAIR and task slots are available . 
But in one thread ,one submit will complete and then the another one will start 
. If there are independent stages in one job, then those will run parallel.
I agree with Bryan Jeffrey .

RegardsPralabh Kumar
On Tue, Jun 27, 2017 at 9:03 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
I think the spark cluster receives two submits, A and B.The FAIR  is used to 
schedule A and B.I am not sure about this.
 ---Original---From: "Bryan Jeffrey"Date: 2017/6/27 
08:55:42To: "satishl";Cc: 
"user";Subject: Re: Question about Parallel Stages in 
Spark
Hello.
The driver is running the individual operations in series, but each operation 
is parallelized internally.  If you want them run in parallel you need to 
provide the driver a mechanism to thread the job scheduling out:
val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(1 to 20)

var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par

thingsToDo.foreach { case(rdd, index) =>
  for(i <- (1 to 1))
logger.info(s"Index ${index} - ${rdd.sum()}")
}
This will run both operations in parallel.

On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:
For the below code, since rdd1 and rdd2 dont depend on each other - i was

expecting that both first and second printlns would be interwoven. However -

the spark job runs all "first " statements first and then all "seocnd"

statements next in serial fashion. I have set spark.scheduler.mode = FAIR.

obviously my understanding of parallel stages is wrong. What am I missing?



    val rdd1 = sc.parallelize(1 to 100)

    val rdd2 = sc.parallelize(1 to 100)



    for (i <- (1 to 100))

      println("first: " + rdd1.sum())

    for (i <- (1 to 100))

      println("second" + rdd2.sum())







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spark-tp28793.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org




















Re: Question about Parallel Stages in Spark

2017-06-26 Thread Bryan Jeffrey
Hello.

The driver is running the individual operations in series, but each
operation is parallelized internally.  If you want them run in parallel you
need to provide the driver a mechanism to thread the job scheduling out:

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(1 to 20)

var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par

thingsToDo.foreach { case(rdd, index) =>
  for(i <- (1 to 1))
logger.info(s"Index ${index} - ${rdd.sum()}")
}


This will run both operations in parallel.


On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:

> For the below code, since rdd1 and rdd2 dont depend on each other - i was
> expecting that both first and second printlns would be interwoven. However
> -
> the spark job runs all "first " statements first and then all "seocnd"
> statements next in serial fashion. I have set spark.scheduler.mode = FAIR.
> obviously my understanding of parallel stages is wrong. What am I missing?
>
> val rdd1 = sc.parallelize(1 to 100)
> val rdd2 = sc.parallelize(1 to 100)
>
> for (i <- (1 to 100))
>   println("first: " + rdd1.sum())
> for (i <- (1 to 100))
>   println("second" + rdd2.sum())
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spark-tp28793.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Broadcasts & Storage Memory

2017-06-21 Thread Bryan Jeffrey
Satish, 




I agree - that was my impression too. However I am seeing a smaller set of 
storage memory used on a given executor than the amount of memory required for 
my broadcast variables. I am wondering if the statistics in the ui are 
incorrect or if the broadcasts are simply not a part of that storage memory 
fraction. 




Bryan Jeffrey 




Get Outlook for Android







On Wed, Jun 21, 2017 at 6:48 PM -0400, "satish lalam"  
wrote:










My understanding is - it from storageFraction. Here cached blocks are immune to 
eviction - so both persisted RDDs and broadcast variables sit here. Ref 
On Wed, Jun 21, 2017 at 1:43 PM, Bryan Jeffrey  wrote:
Hello.
Question: Do broadcast variables stored on executors count as part of 'storage 
memory' or other memory?
A little bit more detail:
I understand that we have two knobs to control memory allocation:- 
spark.memory.fraction- spark.memory.storageFraction
My understanding is that spark.memory.storageFraction controls the amount of 
memory allocated for cached RDDs.  spark.memory.fraction controls how much 
memory is allocated to Spark operations (task serialization, operations, etc.), 
w/ the remainder reserved for user data structures, Spark internal metadata, 
etc.  This includes the storage memory for cached RDDs.

You end up with executor memory that looks like the following:All memory: 
0-100Spark memory: 0-75RDD Storage: 0-37Other Spark: 38-75Other Reserved: 76-100
Where do broadcast variables fall into the mix?
Regards,
Bryan Jeffrey









Broadcasts & Storage Memory

2017-06-21 Thread Bryan Jeffrey
Hello.

Question: Do broadcast variables stored on executors count as part of
'storage memory' or other memory?

A little bit more detail:

I understand that we have two knobs to control memory allocation:
- spark.memory.fraction
- spark.memory.storageFraction

My understanding is that spark.memory.storageFraction controls the amount
of memory allocated for cached RDDs.  spark.memory.fraction controls how
much memory is allocated to Spark operations (task serialization,
operations, etc.), w/ the remainder reserved for user data structures,
Spark internal metadata, etc.  This includes the storage memory for cached
RDDs.

You end up with executor memory that looks like the following:
All memory: 0-100
Spark memory: 0-75
RDD Storage: 0-37
Other Spark: 38-75
Other Reserved: 76-100

Where do broadcast variables fall into the mix?

Regards,

Bryan Jeffrey


Re: how many topics spark streaming can handle

2017-06-19 Thread Bryan Jeffrey
Hello Ashok, 




We're consuming from more than 10 topics in some Spark streaming applications. 
Topic management is a concern (what is read from where, etc), but I have seen 
no issues from Spark itself. 




Regards, 




Bryan Jeffrey 




Get Outlook for Android







On Mon, Jun 19, 2017 at 3:24 PM -0400, "Ashok Kumar" 
 wrote:










thank you
in the following example
   val topics = "test1,test2,test3"
    val brokers = "localhost:9092"
    val topicsSet = topics.split(",").toSet
    val sparkConf = new 
SparkConf().setAppName("KafkaDroneCalc").setMaster("local") 
//spark://localhost:7077
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(30))
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder] (ssc, kafkaParams, topicsSet)
  it is possible to have three topics or many topics?

 

On Monday, 19 June 2017, 20:10, Michael Armbrust  
wrote:
  

 I don't think that there is really a Spark specific limit here.  It would be a 
function of the size of your spark / kafka clusters and the type of processing 
you are trying to do.
On Mon, Jun 19, 2017 at 12:00 PM, Ashok Kumar  
wrote:
  Hi Gurus,
Within one Spark streaming process how many topics can be handled? I have not 
tried more than one topic.
Thanks


 






Re: Scala, Python or Java for Spark programming

2017-06-07 Thread Bryan Jeffrey
Mich,

We use Scala for a large project.  On our team we've set a few standards to
ensure readability (we try to avoid excessive use of tuples, use named
functions, etc.)  Given these constraints, I find Scala to be very
readable, and far easier to use than Java.  The Lambda functionality of
Java provides a lot of similar features, but the amount of typing required
to set down a small function is excessive at best!

Regards,

Bryan Jeffrey

On Wed, Jun 7, 2017 at 12:51 PM, Jörn Franke  wrote:

> I think this is a religious question ;-)
> Java is often underestimated, because people are not aware of its lambda
> functionality which makes the code very readable. Scala - it depends who
> programs it. People coming with the normal Java background write Java-like
> code in scala which might not be so good. People from a functional
> background write it more functional like - i.e. You have a lot of things in
> one line of code which can be a curse even for other functional
> programmers, especially if the application is distributed as in the case of
> Spark. Usually no comment is provided and you have - even as a functional
> programmer - to do a lot of drill down. Python is somehow similar, but
> since it has no connection with Java you do not have these extremes. There
> it depends more on the community (e.g. Medical, financials) and skills of
> people how the code look likes.
> However the difficulty comes with the distributed applications behind
> Spark which may have unforeseen side effects if the users do not know this,
> ie if they have never been used to parallel programming.
>
> On 7. Jun 2017, at 17:20, Mich Talebzadeh 
> wrote:
>
>
> Hi,
>
> I am a fan of Scala and functional programming hence I prefer Scala.
>
> I had a discussion with a hardcore Java programmer and a data scientist
> who prefers Python.
>
> Their view is that in a collaborative work using Scala programming it is
> almost impossible to understand someone else's Scala code.
>
> Hence I was wondering how much truth is there in this statement. Given
> that Spark uses Scala as its core development language, what is the general
> view on the use of Scala, Python or Java?
>
> Thanks,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: Is there a way to do conditional group by in spark 2.1.1?

2017-06-03 Thread Bryan Jeffrey
You should be able to project a new column that is your group column. Then you 
can group on the projected column. 




Get Outlook for Android







On Sat, Jun 3, 2017 at 6:26 PM -0400, "upendra 1991" 
 wrote:










Use a function

Sent from Yahoo Mail on Android 
   On Sat, Jun 3, 2017 at 5:01 PM, kant kodali wrote:   Hi 
All,
Is there a way to do conditional group by in spark 2.1.1? other words, I want 
to do something like this

if (field1 == "foo") { 
       df.groupBy(field1)
} else if (field2 == "bar")      df.groupBy(field2)
Thanks
  






Re: Convert camelCase to snake_case when saving Dataframe/Dataset to parquet?

2017-05-22 Thread Bryan Jeffrey
Mike, 




I have code to do that. I'll share it tomorrow. 




Get Outlook for Android







On Mon, May 22, 2017 at 4:53 PM -0400, "Mike Wheeler" 
 wrote:










Hi Spark User,
For Scala case class, we usually use camelCase (carType) for member fields. 
However, many data system use snake_case (car_type) for column names. When 
saving a Dataset of case class to parquet, is there any way to automatically 
convert camelCase to snake_case (carType -> car_type)? 
Thanks,
Mike









Re: [RDDs and Dataframes] Equivalent expressions for RDD API

2017-03-04 Thread bryan . jeffrey


Rdd operation:


rdd.map(x => (word, count)).reduceByKey(_+_)






Get Outlook for Android









On Sat, Mar 4, 2017 at 8:59 AM -0500, "Old-School" 
 wrote:










Hi,

I want to perform some simple transformations and check the execution time,
under various configurations (e.g. number of cores being used, number of
partitions etc). Since it is not possible to set the partitions of a
dataframe , I guess that I should probably use RDDs. 

I've got a dataset with 3 columns as shown below:

val data = file.map(line => line.split(" "))
  .filter(lines => lines.length == 3) // ignore first line
  .map(row => (row(0), row(1), row(2)))
  .toDF("ID", "word-ID", "count")
results in:

+--++-+
| ID |  word-ID   |  count   |
+--++-+
|  15   |87  |   151|
|  20   |19  |   398|
|  15   |19  |   21  |
|  180 |90  |   190|
+---+-+
So how can I turn the above into an RDD in order to use e.g.
sc.parallelize(data, 10) and set the number of partitions to say 10? 

Furthermore, I would also like to ask about the equivalent expression (using
RDD API) for the following simple transformation:

data.select("word-ID",
"count").groupBy("word-ID").agg(sum($"count").as("count")).show()



Thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-and-Dataframes-Equivalent-expressions-for-RDD-API-tp28455.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org








Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread bryan . jeffrey


Mohammad, 


We store our offsets in Cassandra,  and use that for tracking. This solved a 
few issues for us,  as it provides a good persistence mechanism even when 
you're reading from multiple clusters. 


Bryan Jeffrey 




Get Outlook for Android









On Tue, Feb 14, 2017 at 7:03 PM -0500, "Mohammad Kargar"  
wrote:










As explained here, direct approach of integration between spark streaming and 
kafka does not update offsets in Zookeeper, hence Zookeeper-based Kafka 
monitoring tools will not show progress (details).  We followed the recommended 
workaround to update the zookeeper with the latest offset after each batch, but 
no success. Wondering if there's any end-to-end example we can use?

Thanks,
Mohammad









Re: How to specify "verbose GC" in Spark submit?

2017-02-06 Thread Bryan Jeffrey
Hello.

When specifying GC options for Spark you must determine where you want the
GC options specified - on the executors or on the driver. When you submit
your job, for the driver, specify '--driver-java-options
"-XX:+PrintFlagsFinal  -verbose:gc", etc.  For the executor specify --conf
"spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal  -verbose:gc", etc.

Bryan Jeffrey

On Mon, Feb 6, 2017 at 8:02 AM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Dear All,
>
> Is there any way to specify verbose GC -i.e. “-verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps” in Spark submit?
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>


Re: Streaming Batch Oddities

2016-12-13 Thread Bryan Jeffrey
All,

Any thoughts?  I can run another couple of experiments to try to narrow the
problem.  The total data volume in the repartition is around 60GB / batch.

Regards,

Bryan Jeffrey

On Tue, Dec 13, 2016 at 12:11 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I have a current Spark 1.6.1 application that I am working to modify.  The
> flow of the application looks something like the following:
>
> (Kafka) --> (Direct Stream Receiver) --> (Repartition) -->
> (Extract/Schemitization Logic w/ RangePartitioner) --> Several Output
> Operations
>
> In the 'extract' segment above I have the following code:
>
> def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
> defaultPartitions: Int): DStream[SchematizedData] = {
> val messages: DStream[String] = kafkaStreamFactory.create(topics)
> val transformed = messages.map(Schematize(_))
> val result = enriched
> .map(event => { (event.targetEntity, event) })
> .transform(rdd => rdd.partitionBy(new RangePartitioner[String,
> SchematizedData](defaultPartitions, rdd)))
> .map(x => x._2)
>
>  result
> }
>
> This code executes well.  Each follow-on output operation (job) consumes
> from a transform 'partitionBy' ShuffledRDD.  The result is that my 2-minute
> batches are consumed in roughly 48 seconds (streaming batch interval), w/
> the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds
> (so around 78 seconds total).  I have 'concurrent jobs' set to 4.  The
> 'read from kafka' and 'extract' for a given set of topics is a single job
> (two stages), and all of the various output operations follow on after
> completion.
>
> I am aiming to remove the 'RangePartitioner' for two reasons:
>   1. We do not need to re-partition here. We're already calling
> stream.repartition in the Kafka Stream Factory.
>   2. It appears to be causing issues w/ failure to recompute when nodes go
> down.
>
> When I remove the 'RangePartitioner code, I have the following:
>
> def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
> defaultPartitions: Int): DStream[SchematizedData] = {
> val messages: DStream[String] = kafkaStreamFactory.create(topics)
> val transformed = messages.map(Schematize(_))
> val result = enriched.repartition(defaultPartitions)
>
>  result
> }
>
> The resulting code, with no other changes takes around 9 minutes to
> complete a single batch. I am having trouble determining why these have
> such a significant performance difference.  Looking at the partitionBy
> code, it is creating a separate ShuffledRDD w/in the transform block.  The
> 'stream.repartition' call is equivalent to 
> stream.transform(_.repartition(partitions)),
> which calls coalesce (shuffle=true), which creates a new ShuffledRDD with a
> HashPartitioner.  These calls appear functionally equivelent - I am having
> trouble coming up with a justification for the significant performance
> differences between calls.
>
> Help?
>
> Regards,
>
> Bryan Jeffrey
>
>


Streaming Batch Oddities

2016-12-13 Thread Bryan Jeffrey
Hello.

I have a current Spark 1.6.1 application that I am working to modify.  The
flow of the application looks something like the following:

(Kafka) --> (Direct Stream Receiver) --> (Repartition) -->
(Extract/Schemitization Logic w/ RangePartitioner) --> Several Output
Operations

In the 'extract' segment above I have the following code:

def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
defaultPartitions: Int): DStream[SchematizedData] = {
val messages: DStream[String] = kafkaStreamFactory.create(topics)
val transformed = messages.map(Schematize(_))
val result = enriched
.map(event => { (event.targetEntity, event) })
.transform(rdd => rdd.partitionBy(new RangePartitioner[String,
SchematizedData](defaultPartitions, rdd)))
.map(x => x._2)

 result
}

This code executes well.  Each follow-on output operation (job) consumes
from a transform 'partitionBy' ShuffledRDD.  The result is that my 2-minute
batches are consumed in roughly 48 seconds (streaming batch interval), w/
the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds
(so around 78 seconds total).  I have 'concurrent jobs' set to 4.  The
'read from kafka' and 'extract' for a given set of topics is a single job
(two stages), and all of the various output operations follow on after
completion.

I am aiming to remove the 'RangePartitioner' for two reasons:
  1. We do not need to re-partition here. We're already calling
stream.repartition in the Kafka Stream Factory.
  2. It appears to be causing issues w/ failure to recompute when nodes go
down.

When I remove the 'RangePartitioner code, I have the following:

def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
defaultPartitions: Int): DStream[SchematizedData] = {
val messages: DStream[String] = kafkaStreamFactory.create(topics)
val transformed = messages.map(Schematize(_))
val result = enriched.repartition(defaultPartitions)

 result
}

The resulting code, with no other changes takes around 9 minutes to
complete a single batch. I am having trouble determining why these have
such a significant performance difference.  Looking at the partitionBy
code, it is creating a separate ShuffledRDD w/in the transform block.  The
'stream.repartition' call is equivalent to
stream.transform(_.repartition(partitions)), which calls coalesce
(shuffle=true), which creates a new ShuffledRDD with a HashPartitioner.
These calls appear functionally equivelent - I am having trouble coming up
with a justification for the significant performance differences between
calls.

Help?

Regards,

Bryan Jeffrey


Event Log Compression

2016-07-26 Thread Bryan Jeffrey
All,

I am running Spark 1.6.1. I enabled 'spark.eventLog.compress', and the data
is now being compressed using lz4.  I would like to move that back to
Snappy as I have some third party tools that require using Snappy.

Is there a variable used to control Spark eventLog compression algorithm?

Thank you,

Bryan Jeffrey


Spark 2.0

2016-07-25 Thread Bryan Jeffrey
All,

I had three questions:

(1) Is there a timeline for stable Spark 2.0 release?  I know the 'preview'
build is out there, but was curious what the timeline was for full
release. Jira seems to indicate that there should be a release 7/27.

(2)  For 'continuous' datasets there has been a lot of discussion. One item
that came up in tickets was the idea that 'count()' and other functions do
not apply to continuous datasets: https://github.com/apache/spark/pull/12080.
In this case what is the intended procedure to calculate a streaming
statistic based on an interval (e.g. count the number of records in a 2
minute window every 2 minutes)?

(3) In previous releases (1.6.1) the call to DStream / RDD repartition w/ a
number of partitions set to zero silently deletes data.  I have looked in
Jira for a similar issue, but I do not see one.  I would like to address
this (and would likely be willing to go fix it myself).  Should I just
create a ticket?

Thank you,

Bryan Jeffrey


Re: Kafka Exceptions

2016-06-13 Thread Bryan Jeffrey
Cody,

We already set the maxRetries.  We're still seeing issue - when leader is
shifted, for example, it does not appear that direct stream reader
correctly handles this.  We're running 1.6.1.

Bryan Jeffrey

On Mon, Jun 13, 2016 at 10:37 AM, Cody Koeninger  wrote:

> http://spark.apache.org/docs/latest/configuration.html
>
> spark.streaming.kafka.maxRetries
>
> spark.task.maxFailures
>
> On Mon, Jun 13, 2016 at 8:25 AM, Bryan Jeffrey 
> wrote:
> > All,
> >
> > We're running a Spark job that is consuming data from a large Kafka
> cluster
> > using the Direct Stream receiver.  We're seeing intermittent
> > NotLeaderForPartitionExceptions when the leader is moved to another
> broker.
> > Currently even with retry enabled we're seeing the job fail at this
> > exception.  Is there a configuration setting I am missing?  How are these
> > issues typically handled?
> >
> > User class threw exception: org.apache.spark.SparkException:
> > ArrayBuffer(kafka.common.NotLeaderForPartitionException,
> > org.apache.spark.SparkException: Couldn't find leader offsets for
> > Set([MyTopic,43]))
> >
> > Thank you,
> >
> > Bryan Jeffrey
> >
>


Kafka Exceptions

2016-06-13 Thread Bryan Jeffrey
All,

We're running a Spark job that is consuming data from a large Kafka cluster
using the Direct Stream receiver.  We're seeing intermittent
NotLeaderForPartitionExceptions when the leader is moved to another
broker.  Currently even with retry enabled we're seeing the job fail at
this exception.  Is there a configuration setting I am missing?  How are
these issues typically handled?

User class threw exception: org.apache.spark.SparkException:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([MyTopic,43]))

Thank you,

Bryan Jeffrey


Re: Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
All,

Thank you for the replies.  It seems as though the Dataset API is still far
behind the RDD API.  This is unfortunate as the Dataset API potentially
provides a number of performance benefits.  I will move to using it in a
more limited set of cases for the moment.

Thank you!

Bryan Jeffrey

On Tue, Jun 7, 2016 at 2:50 PM, Richard Marscher 
wrote:

> There certainly are some gaps between the richness of the RDD API and the
> Dataset API. I'm also migrating from RDD to Dataset and ran into
> reduceByKey and join scenarios.
>
> In the spark-dev list, one person was discussing reduceByKey being
> sub-optimal at the moment and it spawned this JIRA
> https://issues.apache.org/jira/browse/SPARK-15598. But you might be able
> to get by with groupBy().reduce() for now, check performance though.
>
> As for join, the approach would be using the joinWith function on Dataset.
> Although the API isn't as sugary as it was for RDD IMO, something which
> I've been discussing in a separate thread as well. I can't find a weblink
> for it but the thread subject is "Dataset Outer Join vs RDD Outer Join".
>
> On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey 
> wrote:
>
>> It would also be nice if there was a better example of joining two
>> Datasets. I am looking at the documentation here:
>> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems
>> a little bit sparse - is there a better documentation source?
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey 
>> wrote:
>>
>>> Hello.
>>>
>>> I am looking at the option of moving RDD based operations to Dataset
>>> based operations.  We are calling 'reduceByKey' on some pair RDDs we have.
>>> What would the equivalent be in the Dataset interface - I do not see a
>>> simple reduceByKey replacement.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>>
>>
>
>
> --
> *Richard Marscher*
> Senior Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>


Re: Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
It would also be nice if there was a better example of joining two
Datasets. I am looking at the documentation here:
http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems a
little bit sparse - is there a better documentation source?

Regards,

Bryan Jeffrey

On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I am looking at the option of moving RDD based operations to Dataset based
> operations.  We are calling 'reduceByKey' on some pair RDDs we have.  What
> would the equivalent be in the Dataset interface - I do not see a simple
> reduceByKey replacement.
>
> Regards,
>
> Bryan Jeffrey
>
>


Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
Hello.

I am looking at the option of moving RDD based operations to Dataset based
operations.  We are calling 'reduceByKey' on some pair RDDs we have.  What
would the equivalent be in the Dataset interface - I do not see a simple
reduceByKey replacement.

Regards,

Bryan Jeffrey


Streaming application slows over time

2016-05-09 Thread Bryan Jeffrey
All,

I am seeing an odd issue with my streaming application.  I am running Spark
1.4.1, Scala 2.10.  Our streaming application has a batch time of two
minutes.  The application runs well for a reasonable period of time (4-8
hours).  It processes the same data in approximately the same amount of
time.  Because we're consuming large amounts of data, I tweaked some GC
settings to collect more frequently and enabled G1GC.  This effectively
corrected any memory pressure, so memory is looking good.

After some number of batches I am seeing the time to process a batch
increase by a large margin (from 40 seconds / batch to 130 seconds /
batch).  Because our batch time is 2 minutes this has the effect of
(eventually) running behind. This in turn causes memory issues and
eventually leads to OOM.  However, OOM seems to be a symptom of the
slowdown not a cause.  I have looked in driver/executor logs, but do not
see any reason that we're seeing slowness.  The data volume is the same,
none of the executors seem to be crashing, nothing is going to disk, memory
is well within bounds, and nothing else is running on these boxes.

Are there any suggested debugging techniques, things to look for or known
bugs in similar instances?

Regards,

Bryan Jeffrey


Issues with Long Running Streaming Application

2016-04-25 Thread Bryan Jeffrey
Hello.

I have a long running streaming application. It is consuming a large amount
of data from Kafka (on the order of 25K messages / second in two minute
batches.  The job reads the data, makes some decisions on what to save, and
writes the selected data into Cassandra.  The job is very stable - each
batch takes around 25 seconds to process, and every 30 minutes new training
data is read from Cassandra which increases batch time to around 1 minute.

The issue I'm seeing is that the job is stable for around 5-7 hours, after
which it takes an increasingly long time to compute each batch.  The
executor memory used (cached RDDs) remains around the same level, no
operation takes more than a single batch into account, the write time to
Cassandra does not vary significantly - everything just suddenly seems to
take a longer period of time to compute.

Initially I was seeing issues with instability in a shorter time horizon.
To address these issues I took the following steps:

1. Explicitly expired RDDs via 'unpersist' once they were no longer
required.
2. Turned on gen-1 GC via -XX:+UseG1GC
3. Enabled Kryo serialization
4. Removed several long-running aggregate operations (reduceByKeyAndWindow,
updateStateByKey) from this job.

The result is a job that appears completely stable for hours at a time.
The OS does not appear to have any odd tasks run, Cassandra
compaction/cleanup/repair is not responsible for the delay.   Has anyone
seen similar behavior?  Any thoughts?

Regards,

Bryan Jeffrey


Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Bryan Jeffrey
Here is what we're doing:


import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import net.liftweb.json.Extraction._
import net.liftweb.json._
import org.apache.spark.streaming.dstream.DStream

class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
Int) {
  def write[T](data: DStream[T]): Unit = {
KafkaWriter.write(data, topic, brokers, numPartitions)
  }
}

object KafkaWriter {
  def write[T](data: DStream[T], topic: String, brokers: Array[String],
numPartitions: Int): Unit = {
val dataToWrite =
  if (numPartitions > 0) {
data.repartition(numPartitions)
  } else {
data
  }

dataToWrite
  .map(x => new KeyedMessage[String, String](topic,
KafkaWriter.toJson(x)))
  .foreachRDD(rdd => {
  rdd.foreachPartition(part => {
val producer: Producer[String, String] =
KafkaWriter.createProducer(brokers)
part.foreach(item => producer.send(item))
producer.close()
  })
})
  }

  def apply(brokers: Option[Array[String]], topic: String, numPartitions:
Int): KafkaWriter = {
val brokersToUse =
  brokers match {
case Some(x) => x
case None => throw new IllegalArgumentException("Must specify
brokers!")
  }

new KafkaWriter(brokersToUse, topic, numPartitions)
  }

  def toJson[T](data: T): String = {
implicit val formats = DefaultFormats ++
net.liftweb.json.ext.JodaTimeSerializers.all
compactRender(decompose(data))
  }

  def createProducer(brokers: Array[String]): Producer[String, String] = {
val properties = new Properties()
properties.put("metadata.broker.list", brokers.mkString(","))
properties.put("serializer.class", "kafka.serializer.StringEncoder")

val kafkaConfig = new ProducerConfig(properties)
new Producer[String, String](kafkaConfig)
  }
}


Then just call:

val kafkaWriter: KafkaWriter =
KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
config.getString(Parameters.topicName), numPartitions =
kafkaWritePartitions)
    detectionWriter.write(dataToWriteToKafka)


Hope that helps!

Bryan Jeffrey

On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego 
wrote:

> Thanks Ted.
>
>  KafkaWordCount (producer) does not operate on a DStream[T]
>
> ```scala
>
>
> object KafkaWordCountProducer {
>
>   def main(args: Array[String]) {
> if (args.length < 4) {
>   System.err.println("Usage: KafkaWordCountProducer
>   " +
> " ")
>   System.exit(1)
> }
>
> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>
> // Zookeeper connection properties
> val props = new HashMap[String, Object]()
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
>
> val producer = new KafkaProducer[String, String](props)
>
> // Send some messages
> while(true) {
>   (1 to messagesPerSec.toInt).foreach { messageNum =>
> val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>   .mkString(" ")
>
> val message = new ProducerRecord[String, String](topic, null, str)
> producer.send(message)
>   }
>
>   Thread.sleep(1000)
> }
>   }
>
> }
>
> ```
>
>
> Also, doing:
>
>
> ```
> object KafkaSink {
>  def send(brokers: String, sc: SparkContext, topic: String, key:
> String, value: String) =
> getInstance(brokers, sc).value.send(new ProducerRecord(topic,
> key, value))
> }
>
> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>
> ```
>
>
> Doesn't work either, the result is:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>
>
> Thanks!
>
>
>
>
> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
> >
> > In KafkaWordCount , the String is sent back and producer.send() is
> called.
> >
> > I guess if you don't find via solution in your current design, you can
> consider the above.
> >
> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
> wrote:
> >>
> >> Hello,
> >>
> >> I understand that you cannot serialize Kafka Producer.
> >>
> >> So I've tried:
> >>
> >> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serial

Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Bryan Jeffrey
Cody et. al,

I am seeing a similar error.  I've increased the number of retries.  Once
I've got a job up and running I'm seeing it retry correctly. However, I am
having trouble getting the job started - number of retries does not seem to
help with startup behavior.

Thoughts?

Regards,

Bryan Jeffrey

On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeninger  wrote:

> That's a networking error when the driver is attempting to contact
> leaders to get the latest available offsets.
>
> If it's a transient error, you can look at increasing the value of
> spark.streaming.kafka.maxRetries, see
>
> http://spark.apache.org/docs/latest/configuration.html
>
> If it's not a transient error, you need to look at your brokers + your
> network environment.
>
> On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti
>  wrote:
> > Hi,
> >
> > Can you check Kafka topic replication ? And leader information?
> >
> > Regards,
> > Surendra M
> >
> >
> >
> > -- Surendra Manchikanti
> >
> > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss 
> wrote:
> >>
> >> Hi,
> >>
> >> I have a SparkStream (with Kafka) job, after running several days, it
> >> failed with following errors:
> >> ERROR DirectKafkaInputDStream:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> Any idea what would be wrong? will it be SparkStreaming buffer overflow
> >> issue?
> >>
> >>
> >>
> >> Regards
> >>
> >>
> >>
> >>
> >>
> >>
> >> *** from the log ***
> >>
> >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect
> is
> >> overridden to
> >>
> >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
> >> java.nio.channels.ClosedChannelException
> >>
> >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
> >> 1458188031800 ms
> >>
> >> org.apache.spark.SparkException:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> at
> >>
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
> >>
> >> at
> >>
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> >>
> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> >>
> >> at scala.Option.orElse(Option.scala:257)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> >>
> >> at
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >>
> >> at
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >>
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >>
> >>

Re: OOM Exception in my spark streaming application

2016-03-14 Thread Bryan Jeffrey
Steve & Adam,

I would be interesting in hearing the outcome here as well. I am seeing
some similar issues in my 1.4.1 pipeline, using stateful functions
(reduceByKeyAndWindow and updateStateByKey).

Regards,

Bryan Jeffrey

On Mon, Mar 14, 2016 at 6:45 AM, Steve Loughran 
wrote:

>
> > On 14 Mar 2016, at 09:41, adamreith  wrote:
> >
> > I dumped the heap of the driver process and seems that 486.2 MB on 512
> MB of
> > the available memory is used by an instance of the class
> > /org.apache.spark.deploy.yarn.history.YarnHistoryService/. I'm trying to
> > figure out how to solve the issue but till now i didn't found a solution.
> >
> > Could someone help me to sort out the issue?
> >
>
> I've emailed you direct to look @ this
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark job for Reading time series data from Cassandra

2016-03-10 Thread Bryan Jeffrey
Prateek,

I believe that one task is created per Cassandra partition.  How is your
data partitioned?

Regards,

Bryan Jeffrey

On Thu, Mar 10, 2016 at 10:36 AM, Prateek .  wrote:

> Hi,
>
>
>
> I have a Spark Batch job for reading timeseries data from Cassandra which
> has 50,000 rows.
>
>
>
>
>
> JavaRDD cassandraRowsRDD = javaFunctions.cassandraTable("iotdata",
> "coordinate")
>
> .map(*new* Function() {
>
> @Override
>
> *public* String call(CassandraRow cassandraRow)
> *throws* Exception {
>
> *return* cassandraRow.toString();
>
> }
>
> });
>
>
>
> List lm = cassandraRowsRDD.collect();
>
>
>
>
>
> I am testing in local mode where I am observing Spark is creating 770870
> tasks (one job, one stage) which is taking many hours to complete. Can any
> please suggest, what could be possible issues.
>
>
>
>
>
> *Stage Id*
>
> *Description*
>
> *Submitted*
>
> *Duration*
>
> *Tasks: Succeeded/Total*
>
> *Input*
>
> *Output*
>
> *Shuffle Read*
>
> *Shuffle Write*
>
> 0
>
> collect at CassandraSpark.java:94
> <http://localhost:4040/stages/stage?id=0&attempt=0>+details
>
> 2016/03/10 21:01:15
>
> 9 s
>
> 137/*770870*
>
>
>
>
>
> Thank You
>
>
>
> Prateek
> "DISCLAIMER: This message is proprietary to Aricent and is intended solely
> for the use of the individual to whom it is addressed. It may contain
> privileged or confidential information and should not be circulated or used
> for any purpose other than for what it is intended. If you have received
> this message in error, please notify the originator immediately. If you are
> not the intended recipient, you are notified that you are strictly
> prohibited from using, copying, altering, or disclosing the contents of
> this message. Aricent accepts no responsibility for loss or damage arising
> from the use of the information transmitted by this email including damage
> from virus."
>


Suggested Method to Write to Kafka

2016-03-01 Thread Bryan Jeffrey
Hello.

Is there a suggested method and/or some example code to write results from
a Spark streaming job back to Kafka?

I'm using Scala and Spark 1.4.1.

Regards,

Bryan Jeffrey


Re: Spark with .NET

2016-02-09 Thread Bryan Jeffrey
Arko,

Check this out: https://github.com/Microsoft/SparkCLR

This is a Microsoft authored C# language binding for Spark.

Regards,

Bryan Jeffrey

On Tue, Feb 9, 2016 at 3:13 PM, Arko Provo Mukherjee <
arkoprovomukher...@gmail.com> wrote:

> Doesn't seem to be supported, but thanks! I will probably write some .NET
> wrapper in my front end and use the java api in the backend.
> Warm regards
> Arko
>
>
> On Tue, Feb 9, 2016 at 12:05 PM, Ted Yu  wrote:
>
>> This thread is related:
>> http://search-hadoop.com/m/q3RTtwp4nR1lugin1&subj=+NET+on+Apache+Spark+
>>
>> On Tue, Feb 9, 2016 at 11:43 AM, Arko Provo Mukherjee <
>> arkoprovomukher...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I want to use Spark (preferable Spark SQL) using C#. Anyone has any
>>> pointers to that?
>>>
>>> Thanks & regards
>>> Arko
>>>
>>>
>>
>


Re: Access batch statistics in Spark Streaming

2016-02-08 Thread Bryan Jeffrey
>From within a Spark job you can use a Periodic Listener:

ssc.addStreamingListener(PeriodicStatisticsListener(Seconds(60)))

class PeriodicStatisticsListener(timePeriod: Duration) extends
StreamingListener {
  private val logger = LoggerFactory.getLogger("Application")
  override def onBatchCompleted(batchCompleted :
org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted) :
scala.Unit = {

if(startTime == Time(0)) {
  startTime = batchCompleted.batchInfo.batchTime
}
logger.info("Batch Complete @ " + new
DateTime(batchCompleted.batchInfo.batchTime.milliseconds).withZone(DateTimeZone.UTC)
  + " (" + batchCompleted.batchInfo.batchTime + ")" +
  " with records " + batchCompleted.batchInfo.numRecords +
  " in processing time " +
batchCompleted.batchInfo.processingDelay.getOrElse(0.toLong) / 1000 + "
seconds")
  }

On Mon, Feb 8, 2016 at 11:34 AM, Chen Song  wrote:

> Apologize in advance if someone has already asked and addressed this
> question.
>
> In Spark Streaming, how can I programmatically get the batch statistics
> like schedule delay, total delay and processing time (They are shown in the
> job UI streaming tab)? I need such information to raise alerts in some
> circumstances. For example, if the scheduling is delayed more than a
> threshold.
>
> Thanks,
> Chen
>
>


Re: Error w/ Invertable ReduceByKeyAndWindow

2016-02-01 Thread Bryan Jeffrey
Excuse me - I should have mentioned: I am running Spark 1.4.1, Scala 2.11.
I am running in streaming mode receiving data from Kafka.

Regards,

Bryan Jeffrey

On Mon, Feb 1, 2016 at 9:19 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I have a reduceByKeyAndWindow function with an invertable function and
> filter function defined.  I am seeing an error as follows:
>
> "Neither previous window has value for key, nor new values found. Are you
> sure your key classhashes consistently?"
>
> We're using case classes, and so I am sure we're doing consistent
> hashing.  The 'reduceAdd' function is adding to a map. The
> 'inverseReduceFunction' is subtracting from the map. The filter function is
> removing items where the number of entries in the map is zero.  Has anyone
> seen this error before?
>
> Regards,
>
> Bryan Jeffrey
>
>


Error w/ Invertable ReduceByKeyAndWindow

2016-02-01 Thread Bryan Jeffrey
Hello.

I have a reduceByKeyAndWindow function with an invertable function and
filter function defined.  I am seeing an error as follows:

"Neither previous window has value for key, nor new values found. Are you
sure your key classhashes consistently?"

We're using case classes, and so I am sure we're doing consistent hashing.
The 'reduceAdd' function is adding to a map. The 'inverseReduceFunction' is
subtracting from the map. The filter function is removing items where the
number of entries in the map is zero.  Has anyone seen this error before?

Regards,

Bryan Jeffrey


Re: Hive error after update from 1.4.1 to 1.5.2

2015-12-16 Thread Bryan Jeffrey
I had a bunch of library dependencies that were still using Scala 2.10
versions. I updated them to 2.11 and everything has worked fine since.

On Wed, Dec 16, 2015 at 3:12 AM, Ashwin Sai Shankar 
wrote:

> Hi Bryan,
> I see the same issue with 1.5.2,  can you pls let me know what was the
> resolution?
>
> Thanks,
> Ashwin
>
> On Fri, Nov 20, 2015 at 12:07 PM, Bryan Jeffrey 
> wrote:
>
>> Nevermind. I had a library dependency that still had the old Spark
>> version.
>>
>> On Fri, Nov 20, 2015 at 2:14 PM, Bryan Jeffrey 
>> wrote:
>>
>>> The 1.5.2 Spark was compiled using the following options:  mvn
>>> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
>>> -Phive-thriftserver clean package
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>> On Fri, Nov 20, 2015 at 2:13 PM, Bryan Jeffrey 
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I'm seeing an error creating a Hive Context moving from Spark 1.4.1 to
>>>> 1.5.2.  Has anyone seen this issue?
>>>>
>>>> I'm invoking the following:
>>>>
>>>> new HiveContext(sc) // sc is a Spark Context
>>>>
>>>> I am seeing the following error:
>>>>
>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>> SLF4J: Found binding in
>>>> [jar:file:/spark/spark-1.5.2/assembly/target/scala-2.11/spark-assembly-1.5.2-hadoop2.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in
>>>> [jar:file:/hadoop-2.6.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>> explanation.
>>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>>> Exception in thread "main" java.lang.NoSuchMethodException:
>>>> org.apache.hadoop.hive.conf.HiveConf.getTimeVar(org.apache.hadoop.hive.conf.HiveConf$ConfVars,
>>>> java.util.concurrent.TimeUnit)
>>>> at java.lang.Class.getMethod(Class.java:1786)
>>>> at
>>>> org.apache.spark.sql.hive.client.Shim.findMethod(HiveShim.scala:114)
>>>> at
>>>> org.apache.spark.sql.hive.client.Shim_v0_14.getTimeVarMethod$lzycompute(HiveShim.scala:415)
>>>> at
>>>> org.apache.spark.sql.hive.client.Shim_v0_14.getTimeVarMethod(HiveShim.scala:414)
>>>> at
>>>> org.apache.spark.sql.hive.client.Shim_v0_14.getMetastoreClientConnectRetryDelayMillis(HiveShim.scala:459)
>>>> at
>>>> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:198)
>>>> at
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>>> at
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> at
>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>>> at
>>>> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
>>>> at
>>>> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
>>>> at
>>>> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
>>>> at
>>>> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
>>>> at
>>>> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
>>>> at
>>>> org.apache.spark.sql.hive.HiveContext.defaultOverrides(HiveContext.scala:174)
>>>> at
>>>> org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:177)
>>>> at
>>>> Main.Factories.HiveContextSingleton$.createHiveContext(HiveContextSingleton.scala:21)
>>>> at
>>>> Main.Factories.HiveContextSingleton$.getHiveContext(HiveContextSingleton.scala:14)
>>>> at
>>>> Main.Factories.SparkStreamingContextFactory$.createSparkContext(SparkStreamingContextFactory.scala:35)
>>>> at Main.WriteModel$.main(WriteModel.scala:16)
>>>> at Main.WriteModel.main(WriteModel.scala)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>> at
>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>>>> at
>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>>> at
>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>> at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>
>>>
>>>
>>
>


DateTime Support - Hive Parquet

2015-11-23 Thread Bryan Jeffrey
All,

I am attempting to write objects that include a DateTime properties to a
persistent table using Spark 1.5.2 / HiveContext.  In 1.4.1 I was forced to
convert the DateTime properties to Timestamp properties.  I was under the
impression that this issue was fixed in the default Hive supported with
1.5.2 - however, I am still seeing the associated errors.

Is there a bug I can follow to determine when DateTime will be supported
for Parquet?

Regards,

Bryan Jeffrey


Re: Hive error after update from 1.4.1 to 1.5.2

2015-11-20 Thread Bryan Jeffrey
Nevermind. I had a library dependency that still had the old Spark version.

On Fri, Nov 20, 2015 at 2:14 PM, Bryan Jeffrey 
wrote:

> The 1.5.2 Spark was compiled using the following options:  mvn
> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
> -Phive-thriftserver clean package
>
> Regards,
>
> Bryan Jeffrey
>
> On Fri, Nov 20, 2015 at 2:13 PM, Bryan Jeffrey 
> wrote:
>
>> Hello.
>>
>> I'm seeing an error creating a Hive Context moving from Spark 1.4.1 to
>> 1.5.2.  Has anyone seen this issue?
>>
>> I'm invoking the following:
>>
>> new HiveContext(sc) // sc is a Spark Context
>>
>> I am seeing the following error:
>>
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/spark/spark-1.5.2/assembly/target/scala-2.11/spark-assembly-1.5.2-hadoop2.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/hadoop-2.6.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> Exception in thread "main" java.lang.NoSuchMethodException:
>> org.apache.hadoop.hive.conf.HiveConf.getTimeVar(org.apache.hadoop.hive.conf.HiveConf$ConfVars,
>> java.util.concurrent.TimeUnit)
>> at java.lang.Class.getMethod(Class.java:1786)
>> at
>> org.apache.spark.sql.hive.client.Shim.findMethod(HiveShim.scala:114)
>> at
>> org.apache.spark.sql.hive.client.Shim_v0_14.getTimeVarMethod$lzycompute(HiveShim.scala:415)
>> at
>> org.apache.spark.sql.hive.client.Shim_v0_14.getTimeVarMethod(HiveShim.scala:414)
>> at
>> org.apache.spark.sql.hive.client.Shim_v0_14.getMetastoreClientConnectRetryDelayMillis(HiveShim.scala:459)
>> at
>> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:198)
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>> at
>> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
>> at
>> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
>> at
>> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
>> at
>> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
>> at
>> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
>> at
>> org.apache.spark.sql.hive.HiveContext.defaultOverrides(HiveContext.scala:174)
>> at
>> org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:177)
>> at
>> Main.Factories.HiveContextSingleton$.createHiveContext(HiveContextSingleton.scala:21)
>> at
>> Main.Factories.HiveContextSingleton$.getHiveContext(HiveContextSingleton.scala:14)
>> at
>> Main.Factories.SparkStreamingContextFactory$.createSparkContext(SparkStreamingContextFactory.scala:35)
>> at Main.WriteModel$.main(WriteModel.scala:16)
>> at Main.WriteModel.main(WriteModel.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>
>


Re: Hive error after update from 1.4.1 to 1.5.2

2015-11-20 Thread Bryan Jeffrey
The 1.5.2 Spark was compiled using the following options:  mvn
-Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
-Phive-thriftserver clean package

Regards,

Bryan Jeffrey

On Fri, Nov 20, 2015 at 2:13 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I'm seeing an error creating a Hive Context moving from Spark 1.4.1 to
> 1.5.2.  Has anyone seen this issue?
>
> I'm invoking the following:
>
> new HiveContext(sc) // sc is a Spark Context
>
> I am seeing the following error:
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/spark/spark-1.5.2/assembly/target/scala-2.11/spark-assembly-1.5.2-hadoop2.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/hadoop-2.6.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" java.lang.NoSuchMethodException:
> org.apache.hadoop.hive.conf.HiveConf.getTimeVar(org.apache.hadoop.hive.conf.HiveConf$ConfVars,
> java.util.concurrent.TimeUnit)
> at java.lang.Class.getMethod(Class.java:1786)
> at
> org.apache.spark.sql.hive.client.Shim.findMethod(HiveShim.scala:114)
> at
> org.apache.spark.sql.hive.client.Shim_v0_14.getTimeVarMethod$lzycompute(HiveShim.scala:415)
> at
> org.apache.spark.sql.hive.client.Shim_v0_14.getTimeVarMethod(HiveShim.scala:414)
> at
> org.apache.spark.sql.hive.client.Shim_v0_14.getMetastoreClientConnectRetryDelayMillis(HiveShim.scala:459)
> at
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:198)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> at
> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
> at
> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
> at
> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
> at
> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
> at
> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
> at
> org.apache.spark.sql.hive.HiveContext.defaultOverrides(HiveContext.scala:174)
> at
> org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:177)
> at
> Main.Factories.HiveContextSingleton$.createHiveContext(HiveContextSingleton.scala:21)
> at
> Main.Factories.HiveContextSingleton$.getHiveContext(HiveContextSingleton.scala:14)
> at
> Main.Factories.SparkStreamingContextFactory$.createSparkContext(SparkStreamingContextFactory.scala:35)
> at Main.WriteModel$.main(WriteModel.scala:16)
> at Main.WriteModel.main(WriteModel.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>


Hive error after update from 1.4.1 to 1.5.2

2015-11-20 Thread Bryan Jeffrey
Hello.

I'm seeing an error creating a Hive Context moving from Spark 1.4.1 to
1.5.2.  Has anyone seen this issue?

I'm invoking the following:

new HiveContext(sc) // sc is a Spark Context

I am seeing the following error:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/spark/spark-1.5.2/assembly/target/scala-2.11/spark-assembly-1.5.2-hadoop2.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/hadoop-2.6.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.NoSuchMethodException:
org.apache.hadoop.hive.conf.HiveConf.getTimeVar(org.apache.hadoop.hive.conf.HiveConf$ConfVars,
java.util.concurrent.TimeUnit)
at java.lang.Class.getMethod(Class.java:1786)
at
org.apache.spark.sql.hive.client.Shim.findMethod(HiveShim.scala:114)
at
org.apache.spark.sql.hive.client.Shim_v0_14.getTimeVarMethod$lzycompute(HiveShim.scala:415)
at
org.apache.spark.sql.hive.client.Shim_v0_14.getTimeVarMethod(HiveShim.scala:414)
at
org.apache.spark.sql.hive.client.Shim_v0_14.getMetastoreClientConnectRetryDelayMillis(HiveShim.scala:459)
at
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:198)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
at
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
at
org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
at
org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
at
org.apache.spark.sql.hive.HiveContext.defaultOverrides(HiveContext.scala:174)
at
org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:177)
at
Main.Factories.HiveContextSingleton$.createHiveContext(HiveContextSingleton.scala:21)
at
Main.Factories.HiveContextSingleton$.getHiveContext(HiveContextSingleton.scala:14)
at
Main.Factories.SparkStreamingContextFactory$.createSparkContext(SparkStreamingContextFactory.scala:35)
at Main.WriteModel$.main(WriteModel.scala:16)
at Main.WriteModel.main(WriteModel.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Re: Cassandra via SparkSQL/Hive JDBC

2015-11-12 Thread Bryan Jeffrey
I hesitate to ask further questions, but your assistance is advancing my
work much faster than extensive fiddling might.  I am seeing the following
error when querying:

0: jdbc:hive2://localhost:1> create temporary table
cassandraeventcounts using org.apache.spark.sql.cassandra OPTIONS (
keyspace "c2", table "eventcounts" );
Error: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.sql.cassandra.DataTypeConverter$ (state=,code=0)

I started the Thrift server as follows:

root@sparkdev1:~# /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
spark://10.0.0.4:7077 --packages
com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M1 --hiveconf
"spark.cores.max=2" --hiveconf "spark.executor.memory=2g"

Do I perhaps need to include an additional library to do the default
conversion?

Regards,

Bryan Jeffrey


On Thu, Nov 12, 2015 at 1:57 PM, Mohammed Guller 
wrote:

> Hi Bryan,
>
>
>
> Yes, you can query a real Cassandra cluster. You just need to provide the
> address of the Cassandra seed node.
>
>
>
> Looks like you figured out the answer. You can also put the C* seed node
> address in the spark-defaults.conf file under the SPARK_HOME/conf
> directory. Then you don’t need to manually SET it for each Beeline session.
>
>
>
> Mohammed
>
>
>
> *From:* Bryan Jeffrey [mailto:bryan.jeff...@gmail.com]
> *Sent:* Thursday, November 12, 2015 10:26 AM
>
> *To:* Mohammed Guller
> *Cc:* user
> *Subject:* Re: Cassandra via SparkSQL/Hive JDBC
>
>
>
> Answer: In beeline run the following: SET
> spark.cassandra.connection.host="10.0.0.10"
>
>
>
> On Thu, Nov 12, 2015 at 1:13 PM, Bryan Jeffrey 
> wrote:
>
> Mohammed,
>
>
>
> While you're willing to answer questions, is there a trick to getting the
> Hive Thrift server to connect to remote Cassandra instances?
>
>
>
> 0: jdbc:hive2://localhost:1> SET
> spark.cassandra.connection.host="cassandrahost";
> SET spark.cassandra.connection.host="cassandrahost";
> +---+
> |   |
> +---+
> | spark.cassandra.connection.host="cassandrahost"  |
> +---+
> 1 row selected (0.018 seconds)
> 0: jdbc:hive2://localhost:1> create temporary table cdr using
> org.apache.spark.sql.cassandra OPTIONS ( keyspace "c2", table
> "detectionresult" );
> create temporary table cdr using org.apache.spark.sql.cassandra OPTIONS (
> keyspace "c2", table "detectionresult" );
> ]Error: java.io.IOException: Failed to open native connection to Cassandra
> at {10.0.0.4}:9042 (state=,code=0)
>
>
>
> This seems to be connecting to local host regardless of the value I set
> spark.cassandra.connection.host to.
>
>
>
> Regards,
>
>
>
> Bryan Jeffrey
>
>
>
> On Thu, Nov 12, 2015 at 12:54 PM, Bryan Jeffrey 
> wrote:
>
> Yes, I do - I found your example of doing that later in your slides.
> Thank you for your help!
>
>
>
> On Thu, Nov 12, 2015 at 12:20 PM, Mohammed Guller 
> wrote:
>
> Did you mean Hive or Spark SQL JDBC/ODBC server?
>
>
>
> Mohammed
>
>
>
> *From:* Bryan Jeffrey [mailto:bryan.jeff...@gmail.com]
> *Sent:* Thursday, November 12, 2015 9:12 AM
> *To:* Mohammed Guller
> *Cc:* user
> *Subject:* Re: Cassandra via SparkSQL/Hive JDBC
>
>
>
> Mohammed,
>
>
>
> That is great.  It looks like a perfect scenario. Would I be able to make
> the created DF queryable over the Hive JDBC/ODBC server?
>
>
>
> Regards,
>
>
>
> Bryan Jeffrey
>
>
>
> On Wed, Nov 11, 2015 at 9:34 PM, Mohammed Guller 
> wrote:
>
> Short answer: yes.
>
>
>
> The Spark Cassandra Connector supports the data source API. So you can
> create a DataFrame that points directly to a Cassandra table. You can query
> it using the DataFrame API or the SQL/HiveQL interface.
>
>
>
> If you want to see an example,  see slide# 27 and 28 in this deck that I
> presented at the Cassandra Summit 2015:
>
> http://www.slideshare.net/mg007/ad-hoc-analytics-with-cassandra-and-spark
>
>
>
>
>
> Mohammed
>
>
>
> *From:* Bryan [mailto:bryan.jeff...@gmail.com]
> *Sent:* Tuesday, November 10, 2015 7:42 PM
> *To:* Bryan Jeffrey; user
> *Subject:* RE: Cassandra via SparkSQL/Hive JDBC
>
>
>
> Anyone have thoughts or a similar use-case for SparkSQL / Cassandra?
>
> Regards,
>
> Bryan Jeffrey
> -

Re: Cassandra via SparkSQL/Hive JDBC

2015-11-12 Thread Bryan Jeffrey
Answer: In beeline run the following: SET
spark.cassandra.connection.host="10.0.0.10"

On Thu, Nov 12, 2015 at 1:13 PM, Bryan Jeffrey 
wrote:

> Mohammed,
>
> While you're willing to answer questions, is there a trick to getting the
> Hive Thrift server to connect to remote Cassandra instances?
>
> 0: jdbc:hive2://localhost:1> SET
> spark.cassandra.connection.host="cassandrahost";
> SET spark.cassandra.connection.host="cassandrahost";
> +---+
> |   |
> +---+
> | spark.cassandra.connection.host="cassandrahost"  |
> +---+
> 1 row selected (0.018 seconds)
> 0: jdbc:hive2://localhost:1> create temporary table cdr using
> org.apache.spark.sql.cassandra OPTIONS ( keyspace "c2", table
> "detectionresult" );
> create temporary table cdr using org.apache.spark.sql.cassandra OPTIONS (
> keyspace "c2", table "detectionresult" );
> ]Error: java.io.IOException: Failed to open native connection to Cassandra
> at {10.0.0.4}:9042 (state=,code=0)
>
> This seems to be connecting to local host regardless of the value I set
> spark.cassandra.connection.host to.
>
> Regards,
>
> Bryan Jeffrey
>
> On Thu, Nov 12, 2015 at 12:54 PM, Bryan Jeffrey 
> wrote:
>
>> Yes, I do - I found your example of doing that later in your slides.
>> Thank you for your help!
>>
>> On Thu, Nov 12, 2015 at 12:20 PM, Mohammed Guller > > wrote:
>>
>>> Did you mean Hive or Spark SQL JDBC/ODBC server?
>>>
>>>
>>>
>>> Mohammed
>>>
>>>
>>>
>>> *From:* Bryan Jeffrey [mailto:bryan.jeff...@gmail.com]
>>> *Sent:* Thursday, November 12, 2015 9:12 AM
>>> *To:* Mohammed Guller
>>> *Cc:* user
>>> *Subject:* Re: Cassandra via SparkSQL/Hive JDBC
>>>
>>>
>>>
>>> Mohammed,
>>>
>>>
>>>
>>> That is great.  It looks like a perfect scenario. Would I be able to
>>> make the created DF queryable over the Hive JDBC/ODBC server?
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>> Bryan Jeffrey
>>>
>>>
>>>
>>> On Wed, Nov 11, 2015 at 9:34 PM, Mohammed Guller 
>>> wrote:
>>>
>>> Short answer: yes.
>>>
>>>
>>>
>>> The Spark Cassandra Connector supports the data source API. So you can
>>> create a DataFrame that points directly to a Cassandra table. You can query
>>> it using the DataFrame API or the SQL/HiveQL interface.
>>>
>>>
>>>
>>> If you want to see an example,  see slide# 27 and 28 in this deck that I
>>> presented at the Cassandra Summit 2015:
>>>
>>> http://www.slideshare.net/mg007/ad-hoc-analytics-with-cassandra-and-spark
>>>
>>>
>>>
>>>
>>>
>>> Mohammed
>>>
>>>
>>>
>>> *From:* Bryan [mailto:bryan.jeff...@gmail.com]
>>> *Sent:* Tuesday, November 10, 2015 7:42 PM
>>> *To:* Bryan Jeffrey; user
>>> *Subject:* RE: Cassandra via SparkSQL/Hive JDBC
>>>
>>>
>>>
>>> Anyone have thoughts or a similar use-case for SparkSQL / Cassandra?
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>> --
>>>
>>> *From: *Bryan Jeffrey 
>>> *Sent: *‎11/‎4/‎2015 11:16 AM
>>> *To: *user 
>>> *Subject: *Cassandra via SparkSQL/Hive JDBC
>>>
>>> Hello.
>>>
>>>
>>>
>>> I have been working to add SparkSQL HDFS support to our application.
>>> We're able to process streaming data, append to a persistent Hive table,
>>> and have that table available via JDBC/ODBC.  Now we're looking to access
>>> data in Cassandra via SparkSQL.
>>>
>>>
>>>
>>> In reading a number of previous posts, it appears that the way to do
>>> this is to instantiate a Spark Context, read the data into an RDD using the
>>> Cassandra Spark Connector, convert the data to a DF and register it as a
>>> temporary table.  The data will then be accessible via SparkSQL - although
>>> I assume that you would need to refresh the table on a periodic basis.
>>>
>>>
>>>
>>> Is there a more straightforward way to do this?  Is it possible to
>>> register the Cassandra table with Hive so that the SparkSQL thrift server
>>> instance can just read data directly?
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>> Bryan Jeffrey
>>>
>>>
>>>
>>
>>
>


Re: Cassandra via SparkSQL/Hive JDBC

2015-11-12 Thread Bryan Jeffrey
Mohammed,

While you're willing to answer questions, is there a trick to getting the
Hive Thrift server to connect to remote Cassandra instances?

0: jdbc:hive2://localhost:1> SET
spark.cassandra.connection.host="cassandrahost";
SET spark.cassandra.connection.host="cassandrahost";
+---+
|   |
+---+
| spark.cassandra.connection.host="cassandrahost"  |
+---+
1 row selected (0.018 seconds)
0: jdbc:hive2://localhost:1> create temporary table cdr using
org.apache.spark.sql.cassandra OPTIONS ( keyspace "c2", table
"detectionresult" );
create temporary table cdr using org.apache.spark.sql.cassandra OPTIONS (
keyspace "c2", table "detectionresult" );
]Error: java.io.IOException: Failed to open native connection to Cassandra
at {10.0.0.4}:9042 (state=,code=0)

This seems to be connecting to local host regardless of the value I set
spark.cassandra.connection.host to.

Regards,

Bryan Jeffrey

On Thu, Nov 12, 2015 at 12:54 PM, Bryan Jeffrey 
wrote:

> Yes, I do - I found your example of doing that later in your slides.
> Thank you for your help!
>
> On Thu, Nov 12, 2015 at 12:20 PM, Mohammed Guller 
> wrote:
>
>> Did you mean Hive or Spark SQL JDBC/ODBC server?
>>
>>
>>
>> Mohammed
>>
>>
>>
>> *From:* Bryan Jeffrey [mailto:bryan.jeff...@gmail.com]
>> *Sent:* Thursday, November 12, 2015 9:12 AM
>> *To:* Mohammed Guller
>> *Cc:* user
>> *Subject:* Re: Cassandra via SparkSQL/Hive JDBC
>>
>>
>>
>> Mohammed,
>>
>>
>>
>> That is great.  It looks like a perfect scenario. Would I be able to make
>> the created DF queryable over the Hive JDBC/ODBC server?
>>
>>
>>
>> Regards,
>>
>>
>>
>> Bryan Jeffrey
>>
>>
>>
>> On Wed, Nov 11, 2015 at 9:34 PM, Mohammed Guller 
>> wrote:
>>
>> Short answer: yes.
>>
>>
>>
>> The Spark Cassandra Connector supports the data source API. So you can
>> create a DataFrame that points directly to a Cassandra table. You can query
>> it using the DataFrame API or the SQL/HiveQL interface.
>>
>>
>>
>> If you want to see an example,  see slide# 27 and 28 in this deck that I
>> presented at the Cassandra Summit 2015:
>>
>> http://www.slideshare.net/mg007/ad-hoc-analytics-with-cassandra-and-spark
>>
>>
>>
>>
>>
>> Mohammed
>>
>>
>>
>> *From:* Bryan [mailto:bryan.jeff...@gmail.com]
>> *Sent:* Tuesday, November 10, 2015 7:42 PM
>> *To:* Bryan Jeffrey; user
>> *Subject:* RE: Cassandra via SparkSQL/Hive JDBC
>>
>>
>>
>> Anyone have thoughts or a similar use-case for SparkSQL / Cassandra?
>>
>> Regards,
>>
>> Bryan Jeffrey
>> --
>>
>> *From: *Bryan Jeffrey 
>> *Sent: *‎11/‎4/‎2015 11:16 AM
>> *To: *user 
>> *Subject: *Cassandra via SparkSQL/Hive JDBC
>>
>> Hello.
>>
>>
>>
>> I have been working to add SparkSQL HDFS support to our application.
>> We're able to process streaming data, append to a persistent Hive table,
>> and have that table available via JDBC/ODBC.  Now we're looking to access
>> data in Cassandra via SparkSQL.
>>
>>
>>
>> In reading a number of previous posts, it appears that the way to do this
>> is to instantiate a Spark Context, read the data into an RDD using the
>> Cassandra Spark Connector, convert the data to a DF and register it as a
>> temporary table.  The data will then be accessible via SparkSQL - although
>> I assume that you would need to refresh the table on a periodic basis.
>>
>>
>>
>> Is there a more straightforward way to do this?  Is it possible to
>> register the Cassandra table with Hive so that the SparkSQL thrift server
>> instance can just read data directly?
>>
>>
>>
>> Regards,
>>
>>
>>
>> Bryan Jeffrey
>>
>>
>>
>
>


Re: Cassandra via SparkSQL/Hive JDBC

2015-11-12 Thread Bryan Jeffrey
Yes, I do - I found your example of doing that later in your slides.  Thank
you for your help!

On Thu, Nov 12, 2015 at 12:20 PM, Mohammed Guller 
wrote:

> Did you mean Hive or Spark SQL JDBC/ODBC server?
>
>
>
> Mohammed
>
>
>
> *From:* Bryan Jeffrey [mailto:bryan.jeff...@gmail.com]
> *Sent:* Thursday, November 12, 2015 9:12 AM
> *To:* Mohammed Guller
> *Cc:* user
> *Subject:* Re: Cassandra via SparkSQL/Hive JDBC
>
>
>
> Mohammed,
>
>
>
> That is great.  It looks like a perfect scenario. Would I be able to make
> the created DF queryable over the Hive JDBC/ODBC server?
>
>
>
> Regards,
>
>
>
> Bryan Jeffrey
>
>
>
> On Wed, Nov 11, 2015 at 9:34 PM, Mohammed Guller 
> wrote:
>
> Short answer: yes.
>
>
>
> The Spark Cassandra Connector supports the data source API. So you can
> create a DataFrame that points directly to a Cassandra table. You can query
> it using the DataFrame API or the SQL/HiveQL interface.
>
>
>
> If you want to see an example,  see slide# 27 and 28 in this deck that I
> presented at the Cassandra Summit 2015:
>
> http://www.slideshare.net/mg007/ad-hoc-analytics-with-cassandra-and-spark
>
>
>
>
>
> Mohammed
>
>
>
> *From:* Bryan [mailto:bryan.jeff...@gmail.com]
> *Sent:* Tuesday, November 10, 2015 7:42 PM
> *To:* Bryan Jeffrey; user
> *Subject:* RE: Cassandra via SparkSQL/Hive JDBC
>
>
>
> Anyone have thoughts or a similar use-case for SparkSQL / Cassandra?
>
> Regards,
>
> Bryan Jeffrey
> --
>
> *From: *Bryan Jeffrey 
> *Sent: *‎11/‎4/‎2015 11:16 AM
> *To: *user 
> *Subject: *Cassandra via SparkSQL/Hive JDBC
>
> Hello.
>
>
>
> I have been working to add SparkSQL HDFS support to our application.
> We're able to process streaming data, append to a persistent Hive table,
> and have that table available via JDBC/ODBC.  Now we're looking to access
> data in Cassandra via SparkSQL.
>
>
>
> In reading a number of previous posts, it appears that the way to do this
> is to instantiate a Spark Context, read the data into an RDD using the
> Cassandra Spark Connector, convert the data to a DF and register it as a
> temporary table.  The data will then be accessible via SparkSQL - although
> I assume that you would need to refresh the table on a periodic basis.
>
>
>
> Is there a more straightforward way to do this?  Is it possible to
> register the Cassandra table with Hive so that the SparkSQL thrift server
> instance can just read data directly?
>
>
>
> Regards,
>
>
>
> Bryan Jeffrey
>
>
>


Re: Cassandra via SparkSQL/Hive JDBC

2015-11-12 Thread Bryan Jeffrey
Mohammed,

That is great.  It looks like a perfect scenario. Would I be able to make
the created DF queryable over the Hive JDBC/ODBC server?

Regards,

Bryan Jeffrey

On Wed, Nov 11, 2015 at 9:34 PM, Mohammed Guller 
wrote:

> Short answer: yes.
>
>
>
> The Spark Cassandra Connector supports the data source API. So you can
> create a DataFrame that points directly to a Cassandra table. You can query
> it using the DataFrame API or the SQL/HiveQL interface.
>
>
>
> If you want to see an example,  see slide# 27 and 28 in this deck that I
> presented at the Cassandra Summit 2015:
>
> http://www.slideshare.net/mg007/ad-hoc-analytics-with-cassandra-and-spark
>
>
>
>
>
> Mohammed
>
>
>
> *From:* Bryan [mailto:bryan.jeff...@gmail.com]
> *Sent:* Tuesday, November 10, 2015 7:42 PM
> *To:* Bryan Jeffrey; user
> *Subject:* RE: Cassandra via SparkSQL/Hive JDBC
>
>
>
> Anyone have thoughts or a similar use-case for SparkSQL / Cassandra?
>
> Regards,
>
> Bryan Jeffrey
> --
>
> *From: *Bryan Jeffrey 
> *Sent: *‎11/‎4/‎2015 11:16 AM
> *To: *user 
> *Subject: *Cassandra via SparkSQL/Hive JDBC
>
> Hello.
>
>
>
> I have been working to add SparkSQL HDFS support to our application.
> We're able to process streaming data, append to a persistent Hive table,
> and have that table available via JDBC/ODBC.  Now we're looking to access
> data in Cassandra via SparkSQL.
>
>
>
> In reading a number of previous posts, it appears that the way to do this
> is to instantiate a Spark Context, read the data into an RDD using the
> Cassandra Spark Connector, convert the data to a DF and register it as a
> temporary table.  The data will then be accessible via SparkSQL - although
> I assume that you would need to refresh the table on a periodic basis.
>
>
>
> Is there a more straightforward way to do this?  Is it possible to
> register the Cassandra table with Hive so that the SparkSQL thrift server
> instance can just read data directly?
>
>
>
> Regards,
>
>
>
> Bryan Jeffrey
>


Spark Dynamic Partitioning Bug

2015-11-05 Thread Bryan Jeffrey
Hello.

I have come across some odd behavior with writing to persistent Hive tables
in Spark using  dynamic partitioning.

Basically, I create a table.  Using Spark streaming create counts of events
by ID and source.  For each RDD I create a temporary table. I then select
from the temporary table into the actual table.

(1) Create model:
case class HiveEventIdCount(eventGenerationTime: Timestamp,
eventIdAndSource : String, count : Int, eventMonthBin : Long, eventHourBin
: Long)

object HiveEventIdCount {
  def apply(eventIdCount : EventIdCount): HiveEventIdCount = {
HiveEventIdCount(
  eventIdCount.eventGenerationTime,
  eventIdCount.eventIdAndSource,
  eventIdCount.count,
  eventIdCount.eventMonthBin.getMillis,
  eventIdCount.eventHourBin.getMillis
)
  }
}

(2) Create counts:

val eventCounts = eventAndDomainCounts
  .map(x => Tuple2(x.eventIdAndSource, x.count))
  .reduceByKey(_ + _)
  .map(x =>
  EventIdCount(
eventGenerationTime = DateTime.now(DateTimeZone.UTC),
eventIdAndSource = x._1,
count = x._2))

(3) Create the table:

val hc = new HiveContext(sc)
import hc.implicits._

hc.sql("use default")
val eventIdTableCreateStatement = ("""CREATE TABLE IF NOT EXISTS %s
  |(
  |eventGenerationTime timestamp,
  |count bigint
  |) PARTITIONED BY (eventMonthBin bigint, eventHourBin bigint, %s string)
   """ format(eventIdCountTable, eventIdPartitionField)).stripMargin
hc.sql(eventIdTableCreateStatement)
hc.sql("set hive.exec.dynamic.partition.mode=nonstrict")

(4) Insert data from each RDD:
eventCounts.map(x => HiveEventIdCount(x)).foreachRDD(rdd => {
  val tempTableName = "rawevents" + Random.nextInt().abs.toString
  val eventCountsDF = rdd.toDF()
  eventCountsDF.registerTempTable(tempTableName)
  val eventMonthBinPartition = rdd.map(x => x.eventMonthBin).max()
  val eventHourBinPartition = rdd.map(x => x.eventHourBin).max()
  val insertStatement = "insert into table %s partition(eventMonthBin = %s,
eventHourBin = %s, eventIdAndSource) select %s from %s"
 .format(eventIdCountTable, eventMonthBinPartition, eventHourBinPartition,
eventCountsDF.columns.filter(x => x != "eventMonthBin" && x !=
"eventHourBin" && x != "eventIdAndSource").mkString(","), tempTableName)
  hc.sql(insertStatement)
})

This works.  However, the input data has event IDs like '710:Source'.  When
I look at the HDFS created data I see the following:

/user/hive/warehouse/eventidcounts2/eventmonthbin=144633600/eventhourbin=144675720/eventidandsource=3835

Obviously the manually calculated fields are correct. However, the
dynamically calculated (string) partition for idAndSource is a random field
from within my case class.  I've duplicated this with several other classes
and have seen the same result (I use this example because it's very simple).

Any idea if this is a known bug?  Is there a workaround?
Regards,

Bryan Jeffrey


Re: Allow multiple SparkContexts in Unit Testing

2015-11-04 Thread Bryan Jeffrey
Priya,

If you're trying to get unit tests running local spark contexts, you can
just set up your spark context with 'spark.driver.allowMultipleContexts'
set to true.

Example:

def create(seconds : Int, appName : String): StreamingContext = {
  val master = "local[*]"
  val conf = new SparkConf().set("spark.driver.allowMultipleContexts",
"true").setAppName(appName).setMaster(master)
  new StreamingContext(conf, Seconds(seconds))
}

Regards,

Bryan Jeffrey


On Wed, Nov 4, 2015 at 9:49 AM, Ted Yu  wrote:

> Are you trying to speed up tests where each test suite uses single 
> SparkContext
> ?
>
> You may want to read:
> https://issues.apache.org/jira/browse/SPARK-2243
>
> Cheers
>
> On Wed, Nov 4, 2015 at 4:59 AM, Priya Ch 
> wrote:
>
>> Hello All,
>>
>>   How to use multiple Spark Context in executing multiple test suite of
>> spark code ???
>> Can some one throw light on this ?
>>
>
>


Cassandra via SparkSQL/Hive JDBC

2015-11-04 Thread Bryan Jeffrey
Hello.

I have been working to add SparkSQL HDFS support to our application.  We're
able to process streaming data, append to a persistent Hive table, and have
that table available via JDBC/ODBC.  Now we're looking to access data in
Cassandra via SparkSQL.

In reading a number of previous posts, it appears that the way to do this
is to instantiate a Spark Context, read the data into an RDD using the
Cassandra Spark Connector, convert the data to a DF and register it as a
temporary table.  The data will then be accessible via SparkSQL - although
I assume that you would need to refresh the table on a periodic basis.

Is there a more straightforward way to do this?  Is it possible to register
the Cassandra table with Hive so that the SparkSQL thrift server instance
can just read data directly?

Regards,

Bryan Jeffrey


SparkSQL implicit conversion on insert

2015-11-02 Thread Bryan Jeffrey
All,

I have an object Joda DateTime fields. I would prefer to continue to use
the DateTime in my application. When I am inserting into Hive I need to
cast to a Timestamp field (DateTime is not supported).  I added an implicit
conversion from DateTime to Timestamp - but it does not appear to be called
when inserting into the temp table. I am seeing the following error:

java.lang.UnsupportedOperationException: Schema for type
org.joda.time.DateTime is not supported

Is there some magic I need to use to get the implicit conversion when
inserting into Hive, or am I required to do an explicit conversion prior to
insertion?

Regards,

Bryan Jeffrey


Re: Spark -- Writing to Partitioned Persistent Table

2015-10-30 Thread Bryan Jeffrey
Deenar,

This worked perfectly - I moved to SQL Server and things are working well.

Regards,

Bryan Jeffrey

On Thu, Oct 29, 2015 at 8:14 AM, Deenar Toraskar 
wrote:

> Hi Bryan
>
> For your use case you don't need to have multiple metastores. The default
> metastore uses embedded Derby
> <https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin#AdminManualMetastoreAdmin-Local/EmbeddedMetastoreDatabase(Derby)>.
> This cannot be shared amongst multiple processes. Just switch to a
> metastore that supports multiple connections viz. Networked Derby or mysql.
> see https://cwiki.apache.org/confluence/display/Hive/HiveDerbyServerMode
>
> Deenar
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
> On 29 October 2015 at 00:56, Bryan  wrote:
>
>> Yana,
>>
>> My basic use-case is that I want to process streaming data, and publish
>> it to a persistent spark table. After that I want to make the published
>> data (results) available via JDBC and spark SQL to drive a web API. That
>> would seem to require two drivers starting separate HiveContexts (one for
>> sparksql/jdbc, one for streaming)
>>
>> Is there a way to share a hive context between the driver for the thrift
>> spark SQL instance and the streaming spark driver? A better method to do
>> this?
>>
>> An alternate option might be to create the table in two separate
>> metastores and simply use the same storage location for the data. That
>> seems very hacky though, and likely to result in maintenance issues.
>>
>> Regards,
>>
>> Bryan Jeffrey
>> --
>> From: Yana Kadiyska 
>> Sent: ‎10/‎28/‎2015 8:32 PM
>> To: Bryan Jeffrey 
>> Cc: Susan Zhang ; user 
>> Subject: Re: Spark -- Writing to Partitioned Persistent Table
>>
>> For this issue in particular ( ERROR XSDB6: Another instance of Derby
>> may have already booted the database /spark/spark-1.4.1/metastore_db) --
>> I think it depends on where you start your application and HiveThriftserver
>> from. I've run into a similar issue running a driver app first, which would
>> create a directory called metastore_db. If I then try to start SparkShell
>> from the same directory, I will see this exception. So it is like
>> SPARK-9776. It's not so much that the two are in the same process (as the
>> bug resolution states) I think you can't run 2 drivers which start a
>> HiveConext from the same directory.
>>
>>
>> On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey 
>> wrote:
>>
>>> All,
>>>
>>> One issue I'm seeing is that I start the thrift server (for jdbc access)
>>> via the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
>>> spark://master:7077 --hiveconf "spark.cores.max=2"
>>>
>>> After about 40 seconds the Thrift server is started and available on
>>> default port 1.
>>>
>>> I then submit my application - and the application throws the following
>>> error:
>>>
>>> Caused by: java.sql.SQLException: Failed to start database
>>> 'metastore_db' with class loader
>>> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721,
>>> see the next exception for details.
>>> at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>>> Source)
>>> at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>>> Source)
>>> ... 86 more
>>> Caused by: java.sql.SQLException: Another instance of Derby may have
>>> already booted the database /spark/spark-1.4.1/metastore_db.
>>> at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>>> Source)
>>> at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>>> Source)
>>> at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
>>> Source)
>>> at
>>> org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
>>> ... 83 more
>>> Caused by: ERROR XSDB6: Another instance of Derby may have already
>>> booted the database /spark/spark-1.4.1/metastore_db.
>>>
>>> This also happens if I do the opposite (submit the application first,
>>> and then start the thrift server).
>>>
>>> It looks similar to the following issue -- but not quite

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan Jeffrey
ForRow$1.apply(commands.scala:525)
at
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at
scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:525)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:262)
... 8 more

I have dropped the data input volume so that it is negligible (100 events /
second).  I am still seeing this OOM error.  I removed the 'map' elements
in the case class above after seeing several issues with serializing maps
to Parquet, and I'm still seeing the same errors.

The table (as created automatically by Spark HiveContext, with the maps
removed) has the following:

0: jdbc:hive2://localhost:1> describe windows_event9;
+--++--+
|   col_name   | data_type  | comment  |
+--++--+
| targetEntity | string |  |
| targetEntityType | string |  |
| dateTimeUtc  | timestamp  |  |
| eventid  | string |  |
| description  | string |  |
| eventRecordId| string |  |
| level| string |  |
| machineName  | string |  |
| sequenceNumber   | string |  |
| source   | string |  |
| sourceMachineName| string |  |
| taskCategory | string |  |
| user | string |  |
| machineIp| string |  |
| windowseventtimebin  | bigint |  |
+--++--+

| SerDe Library:
org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
|
| InputFormat:
org.apache.hadoop.mapred.SequenceFileInputFormat
|
| OutputFormat:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
|

This seems like a pretty big bug associated with persistent tables.  Am I
missing a step somewhere?

Thank you,

Bryan Jeffrey



On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey 
wrote:

> All,
>
> One issue I'm seeing is that I start the thrift server (for jdbc access)
> via the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
> spark://master:7077 --hiveconf "spark.cores.max=2"
>
> After about 40 seconds the Thrift server is started and available on
> default port 1.
>
> I then submit my application - and the application throws the following
> error:
>
> Caused by: java.sql.SQLException: Failed to start database 'metastore_db'
> with class loader
> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721,
> see the next exception for details.
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
> Source)
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
> Source)
> ... 86 more
> Caused by: java.sql.SQLException: Another instance of Derby may have
> already booted the database /spark/spark-1.4.1/metastore_db.
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
> Source)
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
> Source)
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
> Source)
> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown
> Source)
> ... 83 more
> Caused by: ERROR XSDB6: Another instance of Derby may have already booted
> the database /spark/spark-1.4.1/metastore_db.
>
> This also happens if I do the opposite (submit the application first, and
> then start the thrift server).
>
> It looks similar to the following issue -- but not quite the same:
> https://issues.apache.org/jira/browse/SPARK-9776
>
> It seems like this set of steps works fine if the metadata database is not
> yet created - but once it's created this happens every time.  Is this a
> known issue? Is there a workaround?
>
> Regards,
>
> Bryan Jeffrey
>
> On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey 
> wrote:
>
>> Susan,
>>
>> I did give that a shot -- I'm seeing a number of oddities:
>>
>> (1) 'Partition By' appears only accepts alphanumeric lower case fields.
>> It will work for 'machinename', but not 'machineName' or 'machine_name'.
>> (2) When partitioning with maps included in the data I get odd string
>> conversion issues
>> (3) When partitioning without maps I see frequent out of memory issues
>>
>> I'll updat

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan Jeffrey
All,

One issue I'm seeing is that I start the thrift server (for jdbc access)
via the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
spark://master:7077 --hiveconf "spark.cores.max=2"

After about 40 seconds the Thrift server is started and available on
default port 1.

I then submit my application - and the application throws the following
error:

Caused by: java.sql.SQLException: Failed to start database 'metastore_db'
with class loader
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721, see
the next exception for details.
at
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
Source)
at
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
Source)
... 86 more
Caused by: java.sql.SQLException: Another instance of Derby may have
already booted the database /spark/spark-1.4.1/metastore_db.
at
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
Source)
at
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
Source)
at
org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown
Source)
... 83 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted
the database /spark/spark-1.4.1/metastore_db.

This also happens if I do the opposite (submit the application first, and
then start the thrift server).

It looks similar to the following issue -- but not quite the same:
https://issues.apache.org/jira/browse/SPARK-9776

It seems like this set of steps works fine if the metadata database is not
yet created - but once it's created this happens every time.  Is this a
known issue? Is there a workaround?

Regards,

Bryan Jeffrey

On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey 
wrote:

> Susan,
>
> I did give that a shot -- I'm seeing a number of oddities:
>
> (1) 'Partition By' appears only accepts alphanumeric lower case fields.
> It will work for 'machinename', but not 'machineName' or 'machine_name'.
> (2) When partitioning with maps included in the data I get odd string
> conversion issues
> (3) When partitioning without maps I see frequent out of memory issues
>
> I'll update this email when I've got a more concrete example of problems.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
> On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang  wrote:
>
>> Have you tried partitionBy?
>>
>> Something like
>>
>> hiveWindowsEvents.foreachRDD( rdd => {
>>   val eventsDataFrame = rdd.toDF()
>>   eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
>> windows_event_time_bin").saveAsTable("windows_event")
>> })
>>
>>
>>
>> On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey 
>> wrote:
>>
>>> Hello.
>>>
>>> I am working to get a simple solution working using Spark SQL.  I am
>>> writing streaming data to persistent tables using a HiveContext.  Writing
>>> to a persistent non-partitioned table works well - I update the table using
>>> Spark streaming, and the output is available via Hive Thrift/JDBC.
>>>
>>> I create a table that looks like the following:
>>>
>>> 0: jdbc:hive2://localhost:1> describe windows_event;
>>> describe windows_event;
>>> +--+-+--+
>>> | col_name |  data_type  | comment  |
>>> +--+-+--+
>>> | target_entity| string  | NULL |
>>> | target_entity_type   | string  | NULL |
>>> | date_time_utc| timestamp   | NULL |
>>> | machine_ip   | string  | NULL |
>>> | event_id | string  | NULL |
>>> | event_data   | map  | NULL |
>>> | description  | string  | NULL |
>>> | event_record_id  | string  | NULL |
>>> | level| string  | NULL |
>>> | machine_name | string  | NULL |
>>> | sequence_number  | string  | NULL |
>>> | source   | string  | NULL |
>>> | source_machine_name  | string  | NULL |
>>> | task_category| string  | NULL |
>>> | user | string  | NULL |

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan Jeffrey
Susan,

I did give that a shot -- I'm seeing a number of oddities:

(1) 'Partition By' appears only accepts alphanumeric lower case fields.  It
will work for 'machinename', but not 'machineName' or 'machine_name'.
(2) When partitioning with maps included in the data I get odd string
conversion issues
(3) When partitioning without maps I see frequent out of memory issues

I'll update this email when I've got a more concrete example of problems.

Regards,

Bryan Jeffrey



On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang  wrote:

> Have you tried partitionBy?
>
> Something like
>
> hiveWindowsEvents.foreachRDD( rdd => {
>   val eventsDataFrame = rdd.toDF()
>   eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
> windows_event_time_bin").saveAsTable("windows_event")
> })
>
>
>
> On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey 
> wrote:
>
>> Hello.
>>
>> I am working to get a simple solution working using Spark SQL.  I am
>> writing streaming data to persistent tables using a HiveContext.  Writing
>> to a persistent non-partitioned table works well - I update the table using
>> Spark streaming, and the output is available via Hive Thrift/JDBC.
>>
>> I create a table that looks like the following:
>>
>> 0: jdbc:hive2://localhost:1> describe windows_event;
>> describe windows_event;
>> +--+-+--+
>> | col_name |  data_type  | comment  |
>> +--+-+--+
>> | target_entity| string  | NULL |
>> | target_entity_type   | string  | NULL |
>> | date_time_utc| timestamp   | NULL |
>> | machine_ip   | string  | NULL |
>> | event_id | string  | NULL |
>> | event_data   | map  | NULL |
>> | description  | string  | NULL |
>> | event_record_id  | string  | NULL |
>> | level| string  | NULL |
>> | machine_name | string  | NULL |
>> | sequence_number  | string  | NULL |
>> | source   | string  | NULL |
>> | source_machine_name  | string  | NULL |
>> | task_category| string  | NULL |
>> | user | string  | NULL |
>> | additional_data  | map  | NULL |
>> | windows_event_time_bin   | timestamp   | NULL |
>> | # Partition Information  | |  |
>> | # col_name   | data_type   | comment  |
>> | windows_event_time_bin   | timestamp   | NULL |
>> +--+-+--+
>>
>>
>> However, when I create a partitioned table and write data using the
>> following:
>>
>> hiveWindowsEvents.foreachRDD( rdd => {
>>   val eventsDataFrame = rdd.toDF()
>>
>> eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event")
>> })
>>
>> The data is written as though the table is not partitioned (so everything
>> is written to /user/hive/warehouse/windows_event/file.gz.paquet.  Because
>> the data is not following the partition schema, it is not accessible (and
>> not partitioned).
>>
>> Is there a straightforward way to write to partitioned tables using Spark
>> SQL?  I understand that the read performance for partitioned data is far
>> better - are there other performance improvements that might be better to
>> use instead of partitioning?
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>
>


Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan Jeffrey
Hello.

I am working to get a simple solution working using Spark SQL.  I am
writing streaming data to persistent tables using a HiveContext.  Writing
to a persistent non-partitioned table works well - I update the table using
Spark streaming, and the output is available via Hive Thrift/JDBC.

I create a table that looks like the following:

0: jdbc:hive2://localhost:1> describe windows_event;
describe windows_event;
+--+-+--+
| col_name |  data_type  | comment  |
+--+-+--+
| target_entity| string  | NULL |
| target_entity_type   | string  | NULL |
| date_time_utc| timestamp   | NULL |
| machine_ip   | string  | NULL |
| event_id | string  | NULL |
| event_data   | map  | NULL |
| description  | string  | NULL |
| event_record_id  | string  | NULL |
| level| string  | NULL |
| machine_name | string  | NULL |
| sequence_number  | string  | NULL |
| source   | string  | NULL |
| source_machine_name  | string  | NULL |
| task_category| string  | NULL |
| user | string  | NULL |
| additional_data  | map  | NULL |
| windows_event_time_bin   | timestamp   | NULL |
| # Partition Information  | |  |
| # col_name   | data_type   | comment  |
| windows_event_time_bin   | timestamp   | NULL |
+--+-+--+


However, when I create a partitioned table and write data using the
following:

hiveWindowsEvents.foreachRDD( rdd => {
  val eventsDataFrame = rdd.toDF()

eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event")
})

The data is written as though the table is not partitioned (so everything
is written to /user/hive/warehouse/windows_event/file.gz.paquet.  Because
the data is not following the partition schema, it is not accessible (and
not partitioned).

Is there a straightforward way to write to partitioned tables using Spark
SQL?  I understand that the read performance for partitioned data is far
better - are there other performance improvements that might be better to
use instead of partitioning?

Regards,

Bryan Jeffrey


Hive Version

2015-10-28 Thread Bryan Jeffrey
All,

I am using a HiveContext to create persistent tables from Spark. I am using
the Spark 1.4.1 (Scala 2.11) built-in Hive support.  What version of Hive
does the Spark Hive correspond to? I ask because AVRO format and Timestamps
in Parquet do not appear to be supported.

I have searched a lot of the Spark documentation, but do not see version
specified anywhere - it would be a good addition.

Thank you,

Bryan Jeffrey


Spark SQL Persistent Table - joda DateTime Compatability

2015-10-27 Thread Bryan Jeffrey
Hello.

I am working to create a persistent table using SparkSQL HiveContext. I
have a basic Windows event case class:

case class WindowsEvent(
 targetEntity: String,
 targetEntityType: String,
 dateTimeUtc: DateTime,
 eventId: String,
 eventData: Map[String, String],
 description: String,
 eventRecordId: String,
 level: String,
 machineName: String,
 sequenceNumber: String,
 source: String,
 sourceMachineName: String,
 taskCategory: String,
 user: String,
 machineIp: String,
 additionalData: Map[String, String]
 )

The case class is written to a Hive table as follows:

val hc = new HiveContext(sc)
import hc.implicits._

windowsEvents.foreachRDD( rdd => {
   val eventsDataFrame = rdd.toDF()
  eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("eventsTable")
})

I am seeing the following error:

Exception in thread "main" java.lang.UnsupportedOperationException: Schema
for type org.joda.time.DateTime is not supported


Obviously the DateTime schema is not supported.  How is implicit DateTime
conversion from Joda DateTime to a persistent Hive table accomplished?  Has
anyone else run into the same issue?

Regards,

Bryan Jeffrey


Re: Error Compiling Spark 1.4.1 w/ Scala 2.11 & Hive Support

2015-10-26 Thread Bryan Jeffrey
All,

The error resolved to a bad version of jline pulling from Maven.  The jline
version is defined as 'scala.version' -- the 2.11 version does not exist in
maven.  Instead the following should be used:

 
org.scala-lang
jline
2.11.0-M3
  

Regards,

Bryan Jeffrey

On Mon, Oct 26, 2015 at 9:01 AM, Bryan Jeffrey 
wrote:

> All,
>
> I'm seeing the following error compiling Spark 1.4.1 w/ Scala 2.11 & Hive
> support. Any ideas?
>
> mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
> -Phive-thriftserver package
>
> [INFO] Spark Project Parent POM .. SUCCESS [4.124s]
> [INFO] Spark Launcher Project  SUCCESS [9.001s]
> [INFO] Spark Project Networking .. SUCCESS [7.871s]
> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [3.904s]
> [INFO] Spark Project Unsafe .. SUCCESS [3.095s]
> [INFO] Spark Project Core  SUCCESS
> [24.768s]
> [INFO] Spark Project Bagel ... SUCCESS [2.029s]
> [INFO] Spark Project GraphX .. SUCCESS [4.057s]
> [INFO] Spark Project Streaming ... SUCCESS [9.774s]
> [INFO] Spark Project Catalyst  SUCCESS [6.804s]
> [INFO] Spark Project SQL . SUCCESS [9.606s]
> [INFO] Spark Project ML Library .. SUCCESS
> [10.872s]
> [INFO] Spark Project Tools ... SUCCESS [0.627s]
> [INFO] Spark Project Hive  SUCCESS
> [13.463s]
> [INFO] Spark Project REPL  SUCCESS [1.414s]
> [INFO] Spark Project YARN  SUCCESS [2.433s]
> [INFO] Spark Project Hive Thrift Server .. FAILURE [8.097s]
>
>
> [ERROR]
> /spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:25:
> object ConsoleReader is not a member of package jline
> [ERROR] import jline.{ConsoleReader, History}
> [ERROR]^
> [WARNING] Class jline.Completor not found - continuing with a stub.
> [WARNING] Class jline.ConsoleReader not found - continuing with a stub.
> [ERROR]
> /spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:171:
> not found: type ConsoleReader
> [ERROR] val reader = new ConsoleReader()
> [ERROR]  ^
> [ERROR] Class jline.Completor not found - continuing with a stub.
>


Error Compiling Spark 1.4.1 w/ Scala 2.11 & Hive Support

2015-10-26 Thread Bryan Jeffrey
All,

I'm seeing the following error compiling Spark 1.4.1 w/ Scala 2.11 & Hive
support. Any ideas?

mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
-Phive-thriftserver package

[INFO] Spark Project Parent POM .. SUCCESS [4.124s]
[INFO] Spark Launcher Project  SUCCESS [9.001s]
[INFO] Spark Project Networking .. SUCCESS [7.871s]
[INFO] Spark Project Shuffle Streaming Service ... SUCCESS [3.904s]
[INFO] Spark Project Unsafe .. SUCCESS [3.095s]
[INFO] Spark Project Core  SUCCESS [24.768s]
[INFO] Spark Project Bagel ... SUCCESS [2.029s]
[INFO] Spark Project GraphX .. SUCCESS [4.057s]
[INFO] Spark Project Streaming ... SUCCESS [9.774s]
[INFO] Spark Project Catalyst  SUCCESS [6.804s]
[INFO] Spark Project SQL . SUCCESS [9.606s]
[INFO] Spark Project ML Library .. SUCCESS [10.872s]
[INFO] Spark Project Tools ... SUCCESS [0.627s]
[INFO] Spark Project Hive  SUCCESS [13.463s]
[INFO] Spark Project REPL  SUCCESS [1.414s]
[INFO] Spark Project YARN  SUCCESS [2.433s]
[INFO] Spark Project Hive Thrift Server .. FAILURE [8.097s]


[ERROR]
/spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:25:
object ConsoleReader is not a member of package jline
[ERROR] import jline.{ConsoleReader, History}
[ERROR]^
[WARNING] Class jline.Completor not found - continuing with a stub.
[WARNING] Class jline.ConsoleReader not found - continuing with a stub.
[ERROR]
/spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:171:
not found: type ConsoleReader
[ERROR] val reader = new ConsoleReader()
[ERROR]  ^
[ERROR] Class jline.Completor not found - continuing with a stub.


Re: Example of updateStateByKey with initial RDD?

2015-10-08 Thread Bryan Jeffrey
Honestly, that's what I already did - I am working to test it now.  It
looked like 'add an underscore' was ignoring some implicit argument that I
was failing to provide.

On Thu, Oct 8, 2015 at 8:34 AM, Aniket Bhatnagar  wrote:

> Ohh I see. You could have to add underscore
> after ProbabilityCalculator.updateCountsOfProcessGivenRole. Try:
>
> dstream.map(x => (x.keyWithTime, x))
> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole _,
> new HashPartitioner(3), initialProcessGivenRoleRdd)
>
> Here is an example:
> def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long]
> = {
>   val prevCount = prevStateOpt.getOrElse(0L)
>   val newCount = prevCount + events.size
>   Some(newCount)
> }
> val interval = 60 * 1000
> val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_ *
> interval).map(n => (n % interval, n / interval))
> val counts = eventsStream.map(event => {
>   (event.timestamp - event.timestamp % interval, event)
> }).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new
> HashPartitioner(3), initialRDD = initialRDD)
> counts.print()
>
> Thanks,
> Aniket
>
>
> On Thu, Oct 8, 2015 at 5:48 PM Bryan Jeffrey 
> wrote:
>
>> Aniket,
>>
>> Thank you for the example - but that's not quite what I'm looking for.
>> I've got a call to updateStateByKey that looks like the following:
>>
>> dstream.map(x => (x.keyWithTime, x))
>> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole)
>>
>> def updateCountsOfProcessGivenRole(a : Seq[CountsOfProcessGivenRole], b:
>> Option[CountsOfProcessGivenRole]) : Option[CountsOfProcessGivenRole] = {
>> val currentTime = DateTime.now(DateTimeZone.UTC)
>> if(a.isEmpty) {
>>   if (b.get.eventhourbin.plusDays(3).getMillis <
>> currentTime.getMillis) {
>> None
>>   } else {
>> b
>>   }
>> } else { // a is not empty - b may or may not be defined
>>   val population = if(b.isDefined) b.get.Population else 0 + a.map(x
>> => x.Population).sum
>>   val subpopulation = if(b.isDefined) b.get.Subpopulation else 0 +
>> a.map(x => x.Subpopulation).sum
>>   Some(CountsOfProcessGivenRole(a(0), population, subpopulation))
>> }
>>   }
>>
>> This works fine, however when I go to add an initial RDD, modifying the
>> 'updateStateByKey' call to look like the following:
>>
>> val initialProcessGivenRoleRdd: RDD[((String, String, DateTime),
>> CountsOfProcessGivenRole)] = iPrProcessGivenRole.map(x => (x.keyWithTime,
>> x))
>> dstream.map(x => (x.keyWithTime, x))
>> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole,
>> new HashPartitioner(3), initialProcessGivenRoleRdd)
>>
>> I am getting an error -- 'missing arguments for method
>> updateCountsOfProcessGivenRole'. Looking at the method calls, the function
>> that is called for appears to be the same.  I was hoping an example might
>> shed some light on the issue.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Oct 8, 2015 at 7:04 AM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Here is an example:
>>>
>>> val interval = 60 * 1000
>>> val counts = eventsStream.map(event => {
>>>   (event.timestamp - event.timestamp % interval, event)
>>> }).updateStateByKey[Long](updateFunc = (events: Seq[Event],
>>> prevStateOpt: Option[Long]) => {
>>>   val prevCount = prevStateOpt.getOrElse(0L)
>>>   val newCount = prevCount + events.size
>>>   Some(newCount)
>>> })
>>> counts.print()
>>>
>>> Hope it helps!
>>>
>>> Thanks,
>>> Aniket
>>>
>>> On Thu, Oct 8, 2015 at 4:29 PM Bryan  wrote:
>>>
>>>> Hello,
>>>>
>>>> Can anyone point me to a good example of updateStateByKey with an
>>>> initial RDD? I am seeing a compile time error when following the API.
>>>>
>>>> Regards,
>>>>
>>>> Bryan Jeffrey
>>>>
>>>
>>


Re: Example of updateStateByKey with initial RDD?

2015-10-08 Thread Bryan Jeffrey
Aniket,

Thank you for the example - but that's not quite what I'm looking for.
I've got a call to updateStateByKey that looks like the following:

dstream.map(x => (x.keyWithTime, x))
.updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole)

def updateCountsOfProcessGivenRole(a : Seq[CountsOfProcessGivenRole], b:
Option[CountsOfProcessGivenRole]) : Option[CountsOfProcessGivenRole] = {
val currentTime = DateTime.now(DateTimeZone.UTC)
if(a.isEmpty) {
  if (b.get.eventhourbin.plusDays(3).getMillis < currentTime.getMillis)
{
None
  } else {
b
  }
} else { // a is not empty - b may or may not be defined
  val population = if(b.isDefined) b.get.Population else 0 + a.map(x =>
x.Population).sum
  val subpopulation = if(b.isDefined) b.get.Subpopulation else 0 +
a.map(x => x.Subpopulation).sum
  Some(CountsOfProcessGivenRole(a(0), population, subpopulation))
}
  }

This works fine, however when I go to add an initial RDD, modifying the
'updateStateByKey' call to look like the following:

val initialProcessGivenRoleRdd: RDD[((String, String, DateTime),
CountsOfProcessGivenRole)] = iPrProcessGivenRole.map(x => (x.keyWithTime,
x))
dstream.map(x => (x.keyWithTime, x))
.updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole, new
HashPartitioner(3), initialProcessGivenRoleRdd)

I am getting an error -- 'missing arguments for method
updateCountsOfProcessGivenRole'. Looking at the method calls, the function
that is called for appears to be the same.  I was hoping an example might
shed some light on the issue.

Regards,

Bryan Jeffrey







On Thu, Oct 8, 2015 at 7:04 AM, Aniket Bhatnagar  wrote:

> Here is an example:
>
> val interval = 60 * 1000
> val counts = eventsStream.map(event => {
>   (event.timestamp - event.timestamp % interval, event)
> }).updateStateByKey[Long](updateFunc = (events: Seq[Event], prevStateOpt:
> Option[Long]) => {
>   val prevCount = prevStateOpt.getOrElse(0L)
>   val newCount = prevCount + events.size
>   Some(newCount)
> })
> counts.print()
>
> Hope it helps!
>
> Thanks,
> Aniket
>
> On Thu, Oct 8, 2015 at 4:29 PM Bryan  wrote:
>
>> Hello,
>>
>> Can anyone point me to a good example of updateStateByKey with an initial
>> RDD? I am seeing a compile time error when following the API.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>


Re: Weird worker usage

2015-09-28 Thread Bryan Jeffrey
Nukunj,

No, I'm not calling set w/ master at all.  This ended up being a foolish
configuration problem with my slaves file.

Regards,

Bryan Jeffrey

On Fri, Sep 25, 2015 at 11:20 PM, N B  wrote:

> Bryan,
>
> By any chance, are you calling SparkConf.setMaster("local[*]") inside your
> application code?
>
> Nikunj
>
> On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey 
> wrote:
>
>> Looking at this further, it appears that my Spark Context is not
>> correctly setting the Master name.  I see the following in logs:
>>
>> 15/09/25 16:45:42 INFO DriverRunner: Launch Command:
>> "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp"
>> "/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar"
>> "-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING"
>> "-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "-
>> Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077"
>> "-Dspark.driver.supervise=true" "-Dspark.logConf=true"
>> "-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
>> "-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m"
>> "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
>> sparkWorker@10.0.0.6:48077/user/Worker"
>> "/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
>> "MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker"
>> "kafkaBroker:9092" "--topic" "test" "--numStreams" "9"
>> "--threadParallelism" "9"
>> 15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root
>> 15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root
>> 15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(root); users
>> with modify permissions: Set(root)
>> 15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started
>> 15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on
>> port 59670.
>> 15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp://
>> sparkWorker@10.0.0.6:48077/user/Worker
>> 15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger
>> 15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to
>> akka.tcp://sparkWorker@10.0.0.6:48077/user/Worker
>> 15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory
>> /tmp/sparkcheckpoint does not exist
>> 15/09/25 16:45:45 INFO MainClass: Setting up streaming context with
>> configuration: org.apache.spark.SparkConf@56057cbf and time window 2000
>> ms
>> 15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1
>> 15/09/25 16:45:45 INFO SparkContext: Spark configuration:
>> spark.app.name=MainClass
>> spark.default.parallelism=6
>> spark.driver.supervise=true
>> spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar
>> spark.logConf=true
>> spark.master=local[*]
>> spark.rpc.askTimeout=10
>> spark.streaming.receiver.maxRate=500
>>
>> As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming
>> context still registers the master as local[*].  Any idea why?
>>
>> Thank you,
>>
>> Bryan Jeffrey
>>
>>
>>
>


Re: Weird worker usage

2015-09-25 Thread Bryan Jeffrey
Looking at this further, it appears that my Spark Context is not correctly
setting the Master name.  I see the following in logs:

15/09/25 16:45:42 INFO DriverRunner: Launch Command:
"/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp"
"/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar"
"-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING"
"-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "-
Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077"
"-Dspark.driver.supervise=true" "-Dspark.logConf=true"
"-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
"-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m"
"org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker"
"/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
"MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker"
"kafkaBroker:9092" "--topic" "test" "--numStreams" "9"
"--threadParallelism" "9"
15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root
15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root
15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root); users
with modify permissions: Set(root)
15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started
15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on port
59670.
15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker
15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger
15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker
15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory
/tmp/sparkcheckpoint does not exist
15/09/25 16:45:45 INFO MainClass: Setting up streaming context with
configuration: org.apache.spark.SparkConf@56057cbf and time window 2000 ms
15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1
15/09/25 16:45:45 INFO SparkContext: Spark configuration:
spark.app.name=MainClass
spark.default.parallelism=6
spark.driver.supervise=true
spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar
spark.logConf=true
spark.master=local[*]
spark.rpc.askTimeout=10
spark.streaming.receiver.maxRate=500

As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming
context still registers the master as local[*].  Any idea why?

Thank you,

Bryan Jeffrey


Re: Weird worker usage

2015-09-25 Thread Bryan Jeffrey
I am seeing a similar issue when reading from Kafka.  I have a single Kafka
broker with 1 topic and 10 partitions on a separate machine.  I have a
three-node spark cluster, and verified that all workers are registered with
the master.  I'm initializing Kafka using a similar method to this article:
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/.
I create 3 InputDStreams and union them together to provide a unified
context. I then repartition this to 6 partitions:

val streams = Range(0, configuration.numStreams)
  .map(x => {
 logger.info("Starting Setup of Kafka Stream #" + x + ": \n\tZookeepers: "
+ zookeepersToUse.mkString(",") + "\n\tBrokers: " +
brokersToUse.mkString(",") + "\n\tTopics: " + topicsToUse.mkString(","))
 KafkaStreamFactory.createKafkaStream(ssc, brokersToUse, zookeepersToUse,
topicsToUse)
}).toArray
val unionStream = ssc.union(streams)
if(configuration.threadParallelism > 0) {
  unionStream.repartition(configuration.threadParallelism)
}
unionStream


I am submitting the job to Spark using the following options:

/spark/spark-1.4.1/bin/spark-submit --deploy-mode client --supervise
--master "spark://sparkserver:7077" --conf spark.logConf=true --conf
spark.default.parallelism=6 --conf spark.streaming.receiver.maxRate=500
--class MainClass "/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
--checkpoint /tmp/sparkcheckpoint --broker kafkaBroker:9092 --topic test
--numStreams 9 --threadParallelism 9

Even when I put a long-running job in the queue, none of the other nodes
are anything but idle.

Am I missing something obvious?

Regards,

Bryan Jeffrey





On Fri, Sep 25, 2015 at 8:28 AM, Akhil Das 
wrote:

> Parallel tasks totally depends on the # of partitions that you are having,
> if you are not receiving sufficient partitions (partitions > total # cores)
> then try to do a .repartition.
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>
>> Hello all,
>>
>> I have a Spark streaming application that reads from a Flume Stream, does
>> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
>> operations before writing the analyzed output to ElasticSearch inside a
>> foreachRDD()...
>>
>> I recently started to run this on a 2 node cluster (Standalone) with the
>> driver program directly submitting to Spark master on the same host. The
>> way I have divided the resources is as follows:
>>
>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>> worker)
>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>
>> The application works just fine but it is underusing N2 completely. It
>> seems to use N1 (note that both executors on N1 get used) for all the
>> analytics but when it comes to writing to Elasticsearch, it does divide the
>> data around into all 4 executors which then write to ES on a separate host.
>>
>> I am puzzled as to why the data is not being distributed evenly from the
>> get go into all 4 executors and why would it only do so in the final step
>> of the pipeline which seems counterproductive as well?
>>
>> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>>
>> Any help in getting the resources more evenly utilized on N1 and N2 is
>> welcome.
>>
>> Thanks in advance,
>> Nikunj
>>
>>
>


Yarn Shutting Down Spark Processing

2015-09-22 Thread Bryan Jeffrey
Hello.

I have a Spark streaming job running on a cluster managed by Yarn.  The
spark streaming job starts and receives data from Kafka.  It is processing
well and then after several seconds I see the following error:

15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not
initialize after waiting for 10 ms. Please check earlier log output for
errors. Failing the application.
15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 13, (reason: Timed out waiting for SparkContext.)

The spark process is then (obviously) shut down by Yarn.

What do I need to change to allow Yarn to initialize Spark streaming (vs.
batch) jobs?

Thank you,

Bryan Jeffrey


Suggested Method for Execution of Periodic Actions

2015-09-16 Thread Bryan Jeffrey
Hello.

I have a streaming job that is processing data.  I process a stream of
events, taking actions when I see anomalous events.  I also keep a count
events observed using updateStateByKey to maintain a map of type to count.
I would like to periodically (every 5 minutes) write the results of my
counts to a database.  Is there a built in mechanism or established
pattern to execute periodic jobs in spark streaming?

Regards,

Bryan Jeffrey


Problems with Local Checkpoints

2015-09-09 Thread Bryan Jeffrey
Hello.

I have some basic code that counts numbers using updateStateByKey.  I setup
a streaming context with checkpointing as follows:

def createStreamingContext(masterName : String, checkpointDirectory :
String, timeWindow : Int) : StreamingContext = {
  val sparkConf = new SparkConf().setAppName("Program")
  val ssc = new StreamingContext(sparkConf, Seconds(timeWindow))
  ssc.checkpoint(checkpointDirectory)
  ssc
}


This runs fine on my distributed (Linux) cluster, writing checkpoints to
local disk. However, when I run on my Windows desktop I am seeing a number
of checkpoint errors:

15/09/09 13:57:06 INFO CheckpointWriter: Saving checkpoint for time
1441821426000 ms to file
'file:/C:/Temp/sparkcheckpoint/checkpoint-1441821426000'
Exception in thread "pool-14-thread-4" java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
 at org.apache.hadoop.util.Shell.run(Shell.java:379)
 at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
 at
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
 at
org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
 at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
 at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:772)
 at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:181)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

JAVA_HOME is set correctly, the code runs correctly, it's not a permissions
issue (I've run this as Administrator).  Directories and files are being
created in C:\Temp, although all of the files appear to be empty.

Does anyone have an idea of what is causing these errors?  Has anyone seen
something similar?

Regards,

Bryan Jeffrey


Re: Java vs. Scala for Spark

2015-09-08 Thread Bryan Jeffrey
Thank you for the quick responses.  It's useful to have some insight from
folks already extensively using Spark.

Regards,

Bryan Jeffrey

On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen  wrote:

> Why would Scala vs Java performance be different Ted? Relatively
> speaking there is almost no runtime difference; it's the same APIs or
> calls via a thin wrapper. Scala/Java vs Python is a different story.
>
> Java libraries can be used in Scala. Vice-versa too, though calling
> Scala-generated classes can be clunky in Java. What's your concern
> about interoperability Jeffrey?
>
> I disagree that Java 7 vs Scala usability is sooo different, but it's
> certainly much more natural to use Spark in Scala. Java 8 closes a lot
> of the usability gap with Scala, but not all of it. Enough that it's
> not crazy for a Java shop to stick to Java 8 + Spark and not be at a
> big disadvantage.
>
> The downsides of Scala IMHO are that it provides too much: lots of
> nice features (closures! superb collections!), lots of rope to hang
> yourself too (implicits sometimes!) and some WTF features (XML
> literals!) Learning the good useful bits of Scala isn't hard. You can
> always write Scala code as much like Java as you like, I find.
>
> Scala tooling is different from Java tooling; that's an
> underappreciated barrier. For example I think SBT is good for
> development, bad for general project lifecycle management compared to
> Maven, but in any event still less developed. SBT/scalac are huge
> resource hogs, since so much of Scala is really implemented in the
> compiler; prepare to update your laptop to develop in Scala on your
> IDE of choice, and start to think about running long-running compile
> servers like we did in the year 2000.
>
> Still net-net I would choose Scala, FWIW.
>
> On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
> > Performance wise, Scala is by far the best choice when you use Spark.
> >
> > The cost of learning Scala is not negligible but not insurmountable
> either.
> >
> > My personal opinion.
> >
> > On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey 
> > wrote:
> >>
> >> All,
> >>
> >> We're looking at language choice in developing a simple streaming
> >> processing application in spark.  We've got a small set of example code
> >> built in Scala.  Articles like the following:
> >>
> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
> >> would seem to indicate that Scala is great for use in distributed
> >> programming (including Spark).  However, there is a large group of folks
> >> that seem to feel that interoperability with other Java libraries is
> much to
> >> be desired, and that the cost of learning (yet another) language is
> quite
> >> high.
> >>
> >> Has anyone looked at Scala for Spark dev in an enterprise environment?
> >> What was the outcome?
> >>
> >> Regards,
> >>
> >> Bryan Jeffrey
> >
> >
>


Java vs. Scala for Spark

2015-09-08 Thread Bryan Jeffrey
All,

We're looking at language choice in developing a simple streaming
processing application in spark.  We've got a small set of example code
built in Scala.  Articles like the following:
http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
would seem to indicate that Scala is great for use in distributed
programming (including Spark).  However, there is a large group of folks
that seem to feel that interoperability with other Java libraries is much
to be desired, and that the cost of learning (yet another) language is
quite high.

Has anyone looked at Scala for Spark dev in an enterprise environment?
What was the outcome?

Regards,

Bryan Jeffrey


Getting Started with Spark

2015-09-08 Thread Bryan Jeffrey
Hello. We're getting started with Spark Streaming. We're working to build
some unit/acceptance testing around functions that consume DStreams. The
current method for creating DStreams is to populate the data by creating an
InputDStream:

val input = Array(TestDataFactory.CreateEvent(123 notFoundData))
val queue =
scala.collection.mutable.Queue(ssc.sparkContext.parallelize(input))
val events: InputDStream[MyEvent] = ssc.queueStream(queue)

The 'events' InputDStream can then be fed into functions. However, the
stream does not allow checkpointing. This means that we're unable to use
this to feed methods/classes that execute stateful actions like
'updateStateByKey'.

Does anyone have a simple, contained method to create DStreams that allow
for checkpointing? I looked at the Spark unit test framework, but that
seems to require access to a bunch of spark internals (requiring that
you're within the spark package, etc.)