Re: spark can't connect to kafka via sasl_ssl

2022-07-28 Thread wilson
updated:

now I have resolved the connection issue (due to wrong arguments passed to
sasl).

but I meat another problem:

22/07/28 20:17:48 ERROR MicroBatchExecution: Query [id =
2a3bd87a-3a9f-4e54-a697-3d67cef77230, runId =
11c7ca0d-1bd9-4499-a613-6b6e8e8735ca] terminated with error
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [x]
22/07/28 20:17:48 DEBUG StateStoreCoordinator: Deactivating instances
related to checkpoint location 11c7ca0d-1bd9-4499-a613-6b6e8e8735ca:
Exception in thread "main"
org.apache.spark.sql.streaming.StreamingQueryException: Invalid topics:
[x]


my topic name is just a regular string like "abc_def".

Can you help?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark can't connect to kafka via sasl_ssl

2022-07-27 Thread wilson
Hello,

my spark client program is as following:

import org.apache.spark.sql.SparkSession

object Sparkafka {
def main(args:Array[String]):Unit = {

  val spark = SparkSession.builder.appName("Mykafka").getOrCreate()

  val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "dh-cn-shenzhen.aliyuncs.com:9092")
  .option("security.protocol", "SASL_SSL")
  .option("sasl.mechanism", "PLAIN")
  .option("group.id", "test_project")
  .option("subscribe", "")
  .load()

  import spark.implicits._

  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

  val myCount = df.groupBy("key").count()

  val query = myCount.writeStream
  .outputMode("complete")
  .format("console")
  .start()

  query.awaitTermination()

  }
}

This is the JAAS file for authentication:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="***"
  password="***";
};

And I submitted the job as:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
--driver-java-options
"-Djava.security.auth.login.config=/home/pyh/kafka/jaas.conf" \
--conf
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/pyh/kafka/jaas.conf
\
--class "Sparkafka" \
--master local[2] \
target/scala-2.12/sparkafka_2.12-0.1.jar



The error always shows:

22/07/28 13:16:11 WARN NetworkClient: [Consumer
clientId=consumer-spark-kafka-source-62c48917-7df1-4513-89c0-938390d89257--1440940498-driver-0-1,
groupId=spark-kafka-source-62c48917-7df1-4513-89c0-938390d89257--1440940498-driver-0]
Bootstrap broker dh-cn-shenzhen.aliyuncs.com:9092 (id: -1 rack: null)
disconnected


Spark 3.3.0 in local deployment.

Can you help? thanks



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how to add a column for percent

2022-05-23 Thread wilson
hello

how to add a column for percent for the current row of counted data?

scala>
df2.groupBy("_c1").count.withColumn("percent",f"${col(count)/df2.count}%.2f").show

:30: error: type mismatch;


This doesn't work.

so please help. thanks.


Re: Reverse proxy for Spark UI on Kubernetes

2022-05-17 Thread wilson
what's the advantage of using reverse proxy for spark UI?

Thanks

On Tue, May 17, 2022 at 1:47 PM bo yang  wrote:

> Hi Spark Folks,
>
> I built a web reverse proxy to access Spark UI on Kubernetes (working
> together with https://github.com/GoogleCloudPlatform/spark-on-k8s-operator).
> Want to share here in case other people have similar need.
>
> The reverse proxy code is here:
> https://github.com/datapunchorg/spark-ui-reverse-proxy
>
> Let me know if anyone wants to use or would like to contribute.
>
> Thanks,
> Bo
>
>


Re: groupby question

2022-05-05 Thread wilson

don't know what you were trying to express.
it's better if you can give the sample dataset and the purpose you want 
to achieve, then we may give the right solution.


Thanks

Irene Markelic wrote:
I have and rdd that I want to group according to some key, but it just 
doesn't work. I am a Scala beginner. So I have the following RDD:


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Disable/Remove datasources in Spark

2022-05-05 Thread wilson
btw, I use drill to query webserver log only, b/c drill has that a 
storage plugin for httpd server log.


but I found spark is also convenient to query webserver log for which I 
wrote a note:


https://notes.4shield.net/how-to-query-webserver-log-with-spark.html

Thanks

wilson wrote:
though this is off-topic. but Apache Drill can does that. for instance, 
you can keep only the csv storage plugin in the configuration, but 
remove all other storage plugins. then users on drill can query csv only.


regards


Aditya wrote:
So, is there a way for me to get a list of "leaf" dataframes/RDD that 
they are using in their logic ?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Disable/Remove datasources in Spark

2022-05-05 Thread wilson
though this is off-topic. but Apache Drill can does that. for instance, 
you can keep only the csv storage plugin in the configuration, but 
remove all other storage plugins. then users on drill can query csv only.


regards


Aditya wrote:
So, is there a way for me to get a list of "leaf" dataframes/RDD that 
they are using in their logic ?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Disable/Remove datasources in Spark

2022-05-05 Thread wilson
it's maybe impossible to disable that? user can run spark.read... to 
read any datasource he can reach.



Aditya wrote:

2. But I am not able to figure out how to "disable" all other data sources


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how spark handle the abnormal values

2022-05-02 Thread wilson

Thanks Mich.
But many original datasource has the abnormal values included from my 
experience.
I already used rlike and filter to implement the data cleaning as my 
this writing:

https://bigcount.xyz/calculate-urban-words-vote-in-spark.html

What I am surprised is that spark does the string to numeric converting 
automatically and ignore those non-numeric columns. Based on this, my 
data cleaning seems meaningless.


Thanks.

Mich Talebzadeh wrote:
Agg and ave are numeric functions dealing with the numeric values. Why 
is column number defined as String type?


Do you perform data cleaning beforehand by any chance? It is good practice.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark null values calculation

2022-05-01 Thread wilson
sorry i have found what's the reasons. for null I can not compare it 
directly. I have wrote a note for this.

https://bigcount.xyz/how-spark-handles-null-and-abnormal-values.html

Thanks.

wilson wrote:

do you know why the select results below have not consistent behavior?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how spark handle the abnormal values

2022-05-01 Thread wilson

I did a small test as follows.

scala> df.printSchema()
root
 |-- fruit: string (nullable = true)
 |-- number: string (nullable = true)


scala> df.show()
+--+--+
| fruit|number|
+--+--+
| apple| 2|
|orange| 5|
|cherry| 7|
|  plum|   xyz|
+--+--+


scala> df.agg(avg("number")).show()
+-+
|  avg(number)|
+-+
|4.667|
+-+


As you see, the "number" column is string type, and there is a abnormal 
value in it.


But for these two cases spark still handles the result pretty well. So I 
guess:


1) spark can make some auto translation from string to numeric when 
aggregating.
2) spark ignore those abnormal values automatically when calculating the 
relevant stuff.


Am I right? thank you.

wilson




wilson wrote:
my dataset has abnormal values in the column whose normal values are 
numeric. I can select them as:


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how spark handle the abnormal values

2022-05-01 Thread wilson

Hello

my dataset has abnormal values in the column whose normal values are 
numeric. I can select them as:


scala> df.select("up_votes").filter($"up_votes".rlike(regex)).show()
+---+
|   up_votes|
+---+
|  <|
|  <|
|fx-|
| OP|
|  \|
|  v|
| :O|
|  y|
| :O|
|  ncurs|
|  )|
|  )|
|  X|
| -1|
|':>?< ./ '[]\~`|
|   enc3|
|  X|
|  -|
|  X|
|  N|
+---+
only showing top 20 rows


Even there are those abnormal values in the column, spark can still 
aggregate them. as you can see below.



scala> df.agg(avg("up_votes")).show()
+-+ 


|avg(up_votes)|
+-+
|65.18445431897453|
+-+

so how spark handle the abnormal values in a numeric column? just ignore 
them?



Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark null values calculation

2022-04-30 Thread wilson

my dataset has NULL included in the columns.
do you know why the select results below have not consistent behavior?

scala> dfs.select("cand_status").count()
val res37: Long = 881793 



scala> dfs.select("cand_status").where($"cand_status" =!= "NULL").count()
val res38: Long = 383717 



scala> dfs.select("cand_status").where($"cand_status" === "NULL").count()
val res39: Long = 86402 



scala> dfs.select("cand_status").where($"cand_status" === 
"NULL").where($"cand_status" =!= "NULL").count()

val res40: Long = 0


as you see: 383717 + 86402  != 881793
for which i expect them to be equal.

Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unsubscribe

2022-04-28 Thread wilson

please send the message to user-unsubscr...@spark.apache.org
to unsubscribe.


Ajay Thompson wrote:

Unsubscribe


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark sql slowness in Spark 3.0.1

2022-04-14 Thread wilson

just curious, where to  write?


Anil Dasari wrote:
We are upgrading spark from 2.4.7 to 3.0.1. we use spark sql (hive) to 
checkpoint data frames (intermediate data). DF write is very slow in 
3.0.1 compared to 2.4.7.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Monitoring with elastic search in spark job

2022-04-14 Thread wilson

Maybe you can give a look at this?
https://github.com/banzaicloud/spark-metrics

regards


Xinyu Luan wrote:
Can I get any suggestion or some examples for how to get the metrics 
correctly.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2022-03-16 Thread van wilson


> On Mar 16, 2022, at 7:38 AM,   wrote:
> 
> Thanks, Jayesh and all. I finally get the correlation data frame using agg 
> with list of functions.
> I think the list of functions which generate a column should be more detailed 
> description.
> 
> Liang
> 
> - 原始邮件 -
> 发件人:"Lalwani, Jayesh" 
> 收件人:"ckgppl_...@sina.cn" , Enrico Minack 
> , Sean Owen 
> 抄送人:user 
> 主题:Re: 回复:Re: 回复:Re: calculate 
> correlation_between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
> 日期:2022年03月16日 20点49分
> 
> No, You don’t need 30 dataframes and self joins. Convert a list of columns to 
> a list of functions, and then pass the list of functions to the agg function
> 
>  
> 
>  
> 
> From: "ckgppl_...@sina.cn" 
> Reply-To: "ckgppl_...@sina.cn" 
> Date: Wednesday, March 16, 2022 at 8:16 AM
> To: Enrico Minack , Sean Owen 
> Cc: user 
> Subject: [EXTERNAL] 回复:Re: 回复:Re: calculate correlation 
> between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
> 
>  
> 
> CAUTION: This email originated from outside of the organization. Do not click 
> links or open attachments unless you can confirm the sender and know the 
> content is safe.
> 
>  
> 
> Thanks, Enrico.
> 
> I just found that I need to group the data frame then calculate the 
> correlation. So I will get a list of dataframe, not columns. 
> 
> So I used following solution:
> 
> 1.   use following codes to create a mutable data frame df_all. I used 
> the first datacol to calculate correlation.  
> df.groupby("groupid").agg(functions.corr("datacol1","corr_col")
> 
> 2.   iterate all remaining datacol columns, create a temp data frame for 
> this iteration. In this iteration, use df_all to join the temp data frame on 
> the groupid column, then drop duplicated groupid column.
> 
> 3.   after the iteration, I will get the dataframe which contains all 
> correlation data.
> 
> 
> 
> 
> I need to verify the data to make sure it is valid.
> 
> 
> 
> 
> Liang
> 
> - 原始邮件 -
> 发件人:Enrico Minack 
> 收件人:ckgppl_...@sina.cn, Sean Owen 
> 抄送人:user 
> 主题:Re: 回复:Re: calculate correlation 
> between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
> 日期:2022年03月16日 19点53分
> 
>  
> 
> If you have a list of Columns called `columns`, you can pass them to the 
> `agg` method as:
> 
>  
> 
>   agg(columns.head, columns.tail: _*)
> 
>  
> 
> Enrico
> 
>  
> 
>  
> 
> Am 16.03.22 um 08:02 schrieb ckgppl_...@sina.cn :
> 
> Thanks, Sean. I modified the codes and have generated a list of columns.
> 
> I am working on convert a list of columns to a new data frame. It seems that 
> there is no direct  API to do this.
> 
>  
> 
> - 原始邮件 -
> 发件人:Sean Owen  
> 收件人:ckgppl_...@sina.cn 
> 抄送人:user  
> 主题:Re: calculate correlation between multiple columns and one specific column 
> after groupby the spark data frame
> 日期:2022年03月16日 11点55分
> 
>  
> 
> Are you just trying to avoid writing the function call 30 times? Just put 
> this in a loop over all the columns instead, which adds a new corr col every 
> time to a list. 
> 
> On Tue, Mar 15, 2022, 10:30 PM  > wrote:
> 
> Hi all,
> 
>  
> 
> I am stuck at  a correlation calculation problem. I have a dataframe like 
> below:
> 
> groupid
> 
> datacol1
> 
> datacol2
> 
> datacol3
> 
> datacol*
> 
> corr_co
> 
> 1
> 
> 1
> 
> 2
> 
> 3
> 
> 4
> 
> 5
> 
> 1
> 
> 2
> 
> 3
> 
> 4
> 
> 6
> 
> 5
> 
> 2
> 
> 4
> 
> 2
> 
> 1
> 
> 7
> 
> 5
> 
> 2
> 
> 8
> 
> 9
> 
> 3
> 
> 2
> 
> 5
> 
> 3
> 
> 7
> 
> 1
> 
> 2
> 
> 3
> 
> 5
> 
> 3
> 
> 3
> 
> 5
> 
> 3
> 
> 1
> 
> 5
> 
> I want to calculate the correlation between all datacol columns and corr_col 
> column by each groupid.
> 
> So I used the following spark scala-api codes:
> 
> df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))
> 
>  
> 
> This is very inefficient. If I have 30 data_col columns, I need to input 30 
> times functions.corr to calculate correlation.
> 
> I have searched, it seems that functions.corr doesn't accept a List/Array 
> parameter, and df.agg doesn't accept a function to be parameter.
> 
> So any  spark scala API codes can do this job efficiently?
> 
>  
> 
> Thanks
> 
>  
> 
> Liang
> 
>  
> 



Pyspark 2.4.4 window functions inconsistent

2021-11-04 Thread van wilson
I am using pyspark sql to run a sql script windows function to pull in
(lead) data from the next row to populate the first row. It works reliably
on Jupyter in VS code using anaconda pyspark 3.0.0. It produces different
data results every time on aws emr using spark 2.4.4. Why? Is there any
known bugs with subqueries or windows functions on pyspark 2.4?


Spark Kubernetes Architecture: Deployments vs Pods that create Pods

2019-01-29 Thread WILSON Frank
Hi,

I've been playing around with Spark Kubernetes deployments over the past week 
and I'm curious to know why Spark deploys as a driver pod that creates more 
worker pods.

I've read that it's normal to use Kubernetes Deployments to create a 
distributed service, so I am wondering why Spark just creates Pods. I suppose 
the driver program
is 'the odd one out' so it doesn't belong in a Deployment or ReplicaSet, but 
maybe the workers could be Deployment? Is this something to do with data 
locality?

I have tried Streaming pipelines on Kubernetes yet, are these also Pods that 
create Pods rather than Deployments? It seems more important for a streaming 
pipeline to be 'durable'[1] as the Kubernetes documentation might say.

I ask this question partly because the Kubernetes deployment of Spark is still 
experimental and I am wondering whether this aspect of the deployment might 
change.

I had a look at the Flink[2] documentation and it does seem to use Deployments 
however these seem to be a lightweight job/task manager that accepts Flink 
jobs. It sounds actually like running a lightweight version YARN inside 
containers on Kubernetes.


Thanks,


Frank

[1] 
https://kubernetes.io/docs/concepts/workloads/pods/pod/#durability-of-pods-or-lack-thereof
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html


Structured Streaming: stream-stream join with several equality conditions in a disjunction

2018-10-22 Thread WILSON Frank
Hi,

I've just tried to do a stream-stream join with several equality conditions in 
a disjunction and got the following error:

"terminated with exception: Stream stream joins without equality predicate is 
not supported;;"

The query was in this sort of form:

stream.as("origin").join(stream.as("remote"),
  ($"origin.a" === $"remote.a").or($"origin.a" === 
$"remote.b").or($"origin.a" === $"remote.c")
.and($"remote.timestamp" <= $"origin.timestamp")
.and($"remote.timestamp" >= ($"origin.timestamp" - expr("interval 100 
seconds"


Is there a fundamental problem with combining the results of these equality 
comparisons? Or is it just that spark currently doesn't
dig deep enough into the query to find them?

The work around for now is I suppose to take the union of several queries with 
only one equality condition.

Thanks,


Frank




Re: java.lang.NoSuchMethodError - GraphX

2016-10-25 Thread Brian Wilson
I have discovered that this dijkstra's function was written for scala 1.6. The 
remainder of my code is 2.11.

I have checked the functions within the dijkstra function and can’t see any 
that are illegal. For example `mapVertices`, `aggregateMessages` and 
`outerJoinVertices` are all being used correctly.

What else could this be?

Thanks

Brian

> On 25 Oct 2016, at 08:47, Brian Wilson <brian.wilson@gmail.com> wrote:
> 
> Thank you Michael! This looks perfect but I have a `NoSuchMethodError` that I 
> cannot understand. 
> 
> I am trying to implement a weighted shortest path algorithm from your `Spark 
> GraphX in Action` book. The part in question is Listing 6.4 "Executing the 
> shortest path algorithm that uses breadcrumbs"  from Chapter 6 [here][1].
> 
> I have my own graph that I create from two RDDs. There are `344436` vertices 
> and `772983` edges. I can perform an unweighted shortest path computation 
> using the native GraphX library and I'm confident in the graph construction. 
> 
> In this case I use their Dijkstra's implementation as follows:
> 
> val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, 
> edgesRDD).cache()
>   
>   def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
>   var g2 = g.mapVertices(
>   (vid,vd) => (false, if (vid == origin) 0 else 
> Double.MaxValue,
>   
> List[VertexId]()))
> 
>   for (i <- 1L to g.vertices.count-1) {
>   val currentVertexId =
>   g2.vertices.filter(!_._2._1)
>   
> .fold((0L,(false,Double.MaxValue,List[VertexId](((a,b) =>
>   if (a._2._2 < b._2._2) 
> a else b)
>   ._1
> 
>   val newDistances = 
> g2.aggregateMessages[(Double,List[VertexId])](
>   ctx => if (ctx.srcId == 
> currentVertexId)
>
> ctx.sendToDst((ctx.srcAttr._2 + ctx.attr,
>   
> ctx.srcAttr._3 :+ ctx.srcId)),
>   (a,b) => if (a._1 < b._1) a 
> else b)
> 
>   g2 = g2.outerJoinVertices(newDistances)((vid, 
> vd, newSum) => {
>   val newSumVal =
>   
> newSum.getOrElse((Double.MaxValue,List[VertexId]()))
>   (vd._1 || vid == currentVertexId,
>   math.min(vd._2, newSumVal._1),
>   if (vd._2 < newSumVal._1) vd._3 else 
> newSumVal._2)})
>   }
>   
>   g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
>   (vd, 
> dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
>.productIterator.toList.tail))
>   }
> 
>   //  Path Finding - random node from which to find all paths
>   val v1 = 400028222916L
> 
> I then call their function with my graph and a random vertex ID. Previously I 
> had issues with `v1` not being recognised as `long` type and the `L` suffix 
> solved this.
> 
>   val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
>   
>   println(results)
>   
> However, this returns the following:
> 
> Error: Exception in thread "main" java.lang.NoSuchMethodError: 
> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>   at GraphX$.dijkstra$1(GraphX.scala:51)
>   at GraphX$.main(GraphX.scala:85)
>   at GraphX.main(GraphX.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>   at org.apache.

Re: java.lang.NoSuchMethodError - GraphX

2016-10-25 Thread Brian Wilson
Thank you Michael! This looks perfect but I have a `NoSuchMethodError` that I 
cannot understand. 

I am trying to implement a weighted shortest path algorithm from your `Spark 
GraphX in Action` book. The part in question is Listing 6.4 "Executing the 
shortest path algorithm that uses breadcrumbs"  from Chapter 6 [here][1].

I have my own graph that I create from two RDDs. There are `344436` vertices 
and `772983` edges. I can perform an unweighted shortest path computation using 
the native GraphX library and I'm confident in the graph construction. 

In this case I use their Dijkstra's implementation as follows:

val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, 
edgesRDD).cache()

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
var g2 = g.mapVertices(
(vid,vd) => (false, if (vid == origin) 0 else 
Double.MaxValue,

List[VertexId]()))

for (i <- 1L to g.vertices.count-1) {
val currentVertexId =
g2.vertices.filter(!_._2._1)

.fold((0L,(false,Double.MaxValue,List[VertexId](((a,b) =>
if (a._2._2 < b._2._2) 
a else b)
._1

val newDistances = 
g2.aggregateMessages[(Double,List[VertexId])](
ctx => if (ctx.srcId == 
currentVertexId)
 
ctx.sendToDst((ctx.srcAttr._2 + ctx.attr,

ctx.srcAttr._3 :+ ctx.srcId)),
(a,b) => if (a._1 < b._1) a 
else b)

g2 = g2.outerJoinVertices(newDistances)((vid, 
vd, newSum) => {
val newSumVal =

newSum.getOrElse((Double.MaxValue,List[VertexId]()))
(vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else 
newSumVal._2)})
}

g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, 
dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
 .productIterator.toList.tail))
}

//  Path Finding - random node from which to find all paths
val v1 = 400028222916L

I then call their function with my graph and a random vertex ID. Previously I 
had issues with `v1` not being recognised as `long` type and the `L` suffix 
solved this.

val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect

println(results)

However, this returns the following:

Error: Exception in thread "main" java.lang.NoSuchMethodError: 
scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at GraphX$.dijkstra$1(GraphX.scala:51)
at GraphX$.main(GraphX.scala:85)
at GraphX.main(GraphX.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Line 51 refers to the line `var g2 = g.mapVertices(`
Line 85 refers to the line `val results = dijkstra(my_graph, 
1L).vertices.map(_._2).collect`

What method is this exception referring to? I am able to package with `sbt` 
without error and I canno see what method I am calling whcih does not exist. 

Many thanks!

Brian

  [1]: https://www.manning.com/books/spark-graphx-in-action#downloads 
<https://www.manning.com/books/spark-graphx-in-action#downloads>

> On 24 Oct 2016, at 16:54, Michael Malak <michaelma...@yahoo.com> wrote:
> 
> Chapter 6 of my book implements Dijkstra's Algorithm. The source code is 
> available to download for free. 
> https://www.manning.com/books/spark-graphx-in-action 
> <https://www.manni

Shortest path with directed and weighted graphs

2016-10-24 Thread Brian Wilson
I have been looking at the ShortestPaths function inbuilt with Spark here 
.

Am I correct in saying there is no support for weighted graphs with this 
function? By that I mean that it assumes all edges carry a weight = 1

Many thanks

Brian 

Re: Eclipse on spark

2015-01-26 Thread Luke Wilson-Mawer
I use this: http://scala-ide.org/

I also use Maven with this archetype:
https://github.com/davidB/scala-archetype-simple. To be frank though, you
should be fine using SBT.

On Sat, Jan 24, 2015 at 6:33 PM, riginos samarasrigi...@gmail.com wrote:

 How to compile a Spark project in Scala IDE for Eclipse? I got many scala
 scripts and i no longer want to load them from scala-shell what can i do?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Eclipse-on-spark-tp21350.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org