Extending GraphFrames without running into serialization issues

2021-01-05 Thread Michal Monselise
Hi,

I am trying to extend GraphFrames and create my own class that has some
additional graph functionality.

To simplify for this example, I have created a class that doesn't contain
any functions. All it does is just extend GraphFrames:

import org.apache.spark.sql.DataFrameimport org.graphframes._
class NewGraphFrame(@transient private val _vertices: DataFrame,
@transient private val _edges: DataFrame) extends
GraphFrame {

}
val vertices = Seq(
  (1, "John"),
  (2, "Jane"),
  (3, "Karen")
).toDF("id", "name")
val edges = Seq(
  (1, 3),
  (2, 3),
  (2, 1)
).toDF("src", "dst")
val g = new NewGraphFrame(vertices, edges)

When I run this code in the REPL, I get the following error:

java.lang.Exception: You cannot use GraphFrame objects within a Spark closure
  at org.graphframes.GraphFrame.vertices(GraphFrame.scala:125)
  at org.graphframes.GraphFrame.toString(GraphFrame.scala:55)
  at 
scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
  at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
  at .$print$lzycompute(:10)
  at .$print(:6)
  at $print()
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
  at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
  at 
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
  at 
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
  at 
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
  at 
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
  at 
scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
  at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
  at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
  at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
  at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
  at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
  at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
  at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
  at 
scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
  at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
  at org.apache.spark.repl.Main$.doMain(Main.scala:76)
  at org.apache.spark.repl.Main$.main(Main.scala:56)
  at org.apache.spark.repl.Main.main(Main.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I know that this means that I'm serializing twice. However, I am obviously
not interested in doing that. I simply want to extend this class so that I
can use the graph functionality in my class. How do I extend this class
without the spark repl throwing this error?


Re: Join with multiple conditions (In reference to SPARK-7197)

2015-08-26 Thread Michal Monselise
Davies, I created an issue - SPARK-10246
<https://issues.apache.org/jira/browse/SPARK-10246>

On Tue, Aug 25, 2015 at 12:53 PM, Davies Liu  wrote:

> It's good to support this, could you create a JIRA for it and target for
> 1.6?
>
> On Tue, Aug 25, 2015 at 11:21 AM, Michal Monselise
>  wrote:
> >
> > Hello All,
> >
> > PySpark currently has two ways of performing a join: specifying a join
> condition or column names.
> >
> > I would like to perform a join using a list of columns that appear in
> both the left and right DataFrames. I have created an example in this
> question on Stack Overflow.
> >
> > Basically, I would like to do the following as specified in the
> documentation in  /spark/python/pyspark/sql/dataframe.py row 560 and
> specify a list of column names:
> >
> > >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
> >
> > However, this produces an error.
> >
> > In JIRA issue SPARK-7197, it is mentioned that the syntax is actually
> different from the one specified in the documentation for joining using a
> condition.
> >
> > Documentation:
> > >>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3,
> cond, 'outer').select(df.name, df3.age).collect()
> >
> > JIRA Issue:
> >
> > a.join(b, (a.year==b.year) & (a.month==b.month), 'inner')
> >
> >
> > In other words. the join function cannot take a list.
> > I was wondering if you could also clarify what is the correct syntax for
> providing a list of columns.
> >
> >
> > Thanks,
> > Michal
> >
> >
>


Fwd: Join with multiple conditions (In reference to SPARK-7197)

2015-08-25 Thread Michal Monselise
Hello All,

PySpark currently has two ways of performing a join: specifying a join
condition or column names.

I would like to perform a join using a list of columns that appear in both
the left and right DataFrames. I have created an example in this question
on Stack Overflow

.

Basically, I would like to do the following as specified in the
documentation in  /spark/python/pyspark/sql/dataframe.py row 560 and
specify a list of column names:

>>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
However, this produces an error.

In JIRA issue SPARK-7197 ,
it is mentioned that the syntax is actually different from the one
specified in the documentation for joining using a condition.

Documentation:
>>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3, cond,
'outer').select(df.name, df3.age).collect()
JIRA Issue:

a.join(b, (a.year==b.year) & (a.month==b.month), 'inner')


In other words. the join function cannot take a list.
I was wondering if you could also clarify what is the correct syntax for
providing a list of columns.


Thanks,
Michal