Re: Why this code is errorfull

2017-12-13 Thread Soheil Pourbafrani
Also I should mention that the `stream` Dstream definition is:

JavaInputDStream> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(TOPIC, kafkaParams)
);


On Thu, Dec 14, 2017 at 10:30 AM, Soheil Pourbafrani 
wrote:

> The following code is in SparkStreaming :
>
> JavaInputDStream results = stream.map(record -> 
> SparkTest.getTime(record.value()) + ":"
> + Long.toString(System.currentTimeMillis()) + ":"
> + Arrays.deepToString(SparkTest.finall(record.value()))
> + ":" + Long.toString(System.currentTimeMillis()))
> .map(record -> record + ":"
> + Long.toString(Long.parseLong(record.split(":")[3])
> - Long.parseLong(record.split(":")[1])));
>
> the `stream` DStream type is `byte[]`. getTime output type is `String`.
> `final` output is `String[][][]`. But this code is with error
> Error:(52, 21) java: incompatible types: no instance(s) of type
> variable(s) R exist so that org.apache.spark.streaming.api.java.JavaDStream
> conforms to org.apache.spark.streaming.api.java.JavaInputDStream<
> java.lang.String>
>
>
> Why? All outputs operations are in String type!!!
>


Why this code is errorfull

2017-12-13 Thread Soheil Pourbafrani
The following code is in SparkStreaming :

JavaInputDStream results = stream.map(record ->
SparkTest.getTime(record.value()) + ":"
+ Long.toString(System.currentTimeMillis()) + ":"
+ Arrays.deepToString(SparkTest.finall(record.value()))
+ ":" + Long.toString(System.currentTimeMillis()))
.map(record -> record + ":"
+ Long.toString(Long.parseLong(record.split(":")[3])
- Long.parseLong(record.split(":")[1])));

the `stream` DStream type is `byte[]`. getTime output type is `String`.
`final` output is `String[][][]`. But this code is with error
Error:(52, 21) java: incompatible types: no instance(s) of type variable(s)
R exist so that org.apache.spark.streaming.api.java.JavaDStream conforms
to org.apache.spark.streaming.api.java.JavaInputDStream


Why? All outputs operations are in String type!!!


bulk upsert data batch from Kafka dstream into Postgres db

2017-12-13 Thread salemi
Hi All,

we are consuming messages from Kafka using Spark dsteam. Once the processing
is done we would like to update/insert the data in bulk fashion into the
database.

I was wondering what the best solution for this might be. Our Postgres
database table is not partitioned.


Thank you,

Ali



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark streaming with flume: cannot assign requested address error

2017-12-13 Thread Junfeng Chen
I am trying to connect spark streaming with flume with pull mode.

I have three machine and each one runs spark  and flume agent at the same
time, where they are master, slave1, slave2.
I have set flume sink to slave1 on port 6689. Telnet slave1 6689 on other
two machine works well.

In my code, I set FlumeUtils.createStream(ssc,"slave1",6689), and submit it
on master  machine with --master local[2].

Then it throw the error:
ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting
receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to:
slave1/10.25.*.*:6689

Caused by: java.net.BindException: Cannot assign requested address

I have read this thread
https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Streaming-Fails-on-Cluster-mode-Flume-as-source/m-p/25577/highlight/true#M621
but i am sure no process use this port by checking netstat -anp | grep
6689. Also the ip address 10.25.** is not a routable address.

So someone can help me to solve it?


Regard,
Junfeng Chen


How to control logging in testing package com.holdenkarau.spark.testing.

2017-12-13 Thread Marco Mistroni
HI all
 could anyone advise on how to control logging  in
com,holdenkarau.spark.testing?

there are loads of spark logging statement every time i run a test
I tried to disable spark logging using statements below, but with no success

   import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.log4j.{ Level, Logger }
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)

thanks and kr
 marco


is Union or Join Supported for Spark Structured Streaming Queries in 2.2.0?

2017-12-13 Thread kant kodali
Hi All,

I have messages in a queue that might be coming in with few different
schemas like
msg 1 schema 1, msg2 schema2, msg3 schema3, msg 4 schema1

I want to put all of this in one data frame. is it possible with structured
streaming?

I am using Spark 2.2.0

Thanks!


Re: Apache Spark documentation on mllib's Kmeans doesn't jibe.

2017-12-13 Thread Scott Reynolds
The train method is on the Companion Object
https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.clustering.KMeans$

here is a decent resource on Companion Object usage:
https://docs.scala-lang.org/tour/singleton-objects.html

On Wed, Dec 13, 2017 at 9:16 AM Michael Segel 
wrote:

> Hi,
>
> Just came across this while looking at the docs on how to use Spark’s
> Kmeans clustering.
>
> Note: This appears to be true in both 2.1 and 2.2 documentation.
>
> The overview page:
> https://spark.apache.org/docs/2.1.0/mllib-clustering.html#k-means
> 
>
> Here’ the example contains the following line:
>
> val clusters = KMeans.train(parsedData, numClusters, numIterations)
>
> I was trying to get more information on the train() method.
> So I checked out the KMeans Scala API:
>
>
> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.clustering.KMeans
> 
>
> The issue is that I couldn’t find the train method…
>
> So I thought I was slowly losing my mind.
>
> I checked out the entire API page… could not find any API docs which
> describe the method train().
>
> I ended up looking at the source code and found the method in the scala
> source code.
> (You can see the code here:
> https://github.com/apache/spark/blob/v2.1.0/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
> 
>  )
>
> So the method(s) exist, but not covered in the Scala API doc.
>
> How do you raise this as a ‘bug’ ?
>
> Thx
>
> -Mike
>
> --

Scott Reynolds
Principal Engineer
[image: twilio] 


EMAIL sreyno...@twilio.com


Re: Why do I see five attempts on my Spark application

2017-12-13 Thread sanat kumar Patnaik
It should be within your yarn-site.xml config file.The parameter name is
yarn.resourcemanager.am.max-attempts.

The directory should be /usr/lib/spark/conf/yarn-conf. Try to find this
directory on your gateway node if using Cloudera distribution.

On Wed, Dec 13, 2017 at 2:33 PM, Subhash Sriram 
wrote:

> There are some more properties specifically for YARN here:
>
> http://spark.apache.org/docs/latest/running-on-yarn.html
>
> Thanks,
> Subhash
>
> On Wed, Dec 13, 2017 at 2:32 PM, Subhash Sriram 
> wrote:
>
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> On Wed, Dec 13, 2017 at 2:31 PM, Toy  wrote:
>>
>>> Hi,
>>>
>>> Can you point me to the config for that please?
>>>
>>> On Wed, 13 Dec 2017 at 14:23 Marcelo Vanzin  wrote:
>>>
 On Wed, Dec 13, 2017 at 11:21 AM, Toy  wrote:
 > I'm wondering why am I seeing 5 attempts for my Spark application?
 Does Spark application restart itself?

 It restarts itself if it fails (up to a limit that can be configured
 either per Spark application or globally in YARN).


 --
 Marcelo

>>>
>>
>


-- 
Regards,
Sanat Patnaik
Cell->804-882-6424


Re: Why do I see five attempts on my Spark application

2017-12-13 Thread Subhash Sriram
There are some more properties specifically for YARN here:

http://spark.apache.org/docs/latest/running-on-yarn.html

Thanks,
Subhash

On Wed, Dec 13, 2017 at 2:32 PM, Subhash Sriram 
wrote:

> http://spark.apache.org/docs/latest/configuration.html
>
> On Wed, Dec 13, 2017 at 2:31 PM, Toy  wrote:
>
>> Hi,
>>
>> Can you point me to the config for that please?
>>
>> On Wed, 13 Dec 2017 at 14:23 Marcelo Vanzin  wrote:
>>
>>> On Wed, Dec 13, 2017 at 11:21 AM, Toy  wrote:
>>> > I'm wondering why am I seeing 5 attempts for my Spark application?
>>> Does Spark application restart itself?
>>>
>>> It restarts itself if it fails (up to a limit that can be configured
>>> either per Spark application or globally in YARN).
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>


Re: Why do I see five attempts on my Spark application

2017-12-13 Thread Subhash Sriram
http://spark.apache.org/docs/latest/configuration.html

On Wed, Dec 13, 2017 at 2:31 PM, Toy  wrote:

> Hi,
>
> Can you point me to the config for that please?
>
> On Wed, 13 Dec 2017 at 14:23 Marcelo Vanzin  wrote:
>
>> On Wed, Dec 13, 2017 at 11:21 AM, Toy  wrote:
>> > I'm wondering why am I seeing 5 attempts for my Spark application? Does
>> Spark application restart itself?
>>
>> It restarts itself if it fails (up to a limit that can be configured
>> either per Spark application or globally in YARN).
>>
>>
>> --
>> Marcelo
>>
>


Re: Why do I see five attempts on my Spark application

2017-12-13 Thread Toy
Hi,

Can you point me to the config for that please?

On Wed, 13 Dec 2017 at 14:23 Marcelo Vanzin  wrote:

> On Wed, Dec 13, 2017 at 11:21 AM, Toy  wrote:
> > I'm wondering why am I seeing 5 attempts for my Spark application? Does
> Spark application restart itself?
>
> It restarts itself if it fails (up to a limit that can be configured
> either per Spark application or globally in YARN).
>
>
> --
> Marcelo
>


Re: Why do I see five attempts on my Spark application

2017-12-13 Thread Marcelo Vanzin
On Wed, Dec 13, 2017 at 11:21 AM, Toy  wrote:
> I'm wondering why am I seeing 5 attempts for my Spark application? Does Spark 
> application restart itself?

It restarts itself if it fails (up to a limit that can be configured
either per Spark application or globally in YARN).


-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Why do I see five attempts on my Spark application

2017-12-13 Thread Toy
Hi,

I'm wondering why am I seeing 5 attempts for my Spark application? Does
Spark application restart itself?[image: Screen Shot 2017-12-13 at 2.18.03
PM.png]


Different behaviour when querying a spark DataFrame from dynamodb

2017-12-13 Thread Bogdan Cojocar
I am reading some data in a dataframe from a dynamo db table:

val data = spark.read.dynamodb("table")
data.filter($"field1".like("%hello%")).createOrReplaceTempView("temp")
spark.sql("select * from temp").show()

When I do the last statement I get results. If however I try to do:

spark.sql("select field2 from temp").show()

I get no results. The dataframe has the structure:

root
 |-- field1: string (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: string (nullable = true)
 |-- field4: long (nullable = true)
 |-- field5: string (nullable = true)

Dependencies:

spark 2.2.0
scala 2.11.8
spark-dynamodb 0.0.11

Spark running on local[*]


Apache Spark documentation on mllib's Kmeans doesn't jibe.

2017-12-13 Thread Michael Segel
Hi,

Just came across this while looking at the docs on how to use Spark’s Kmeans 
clustering.

Note: This appears to be true in both 2.1 and 2.2 documentation.

The overview page:
https://spark.apache.org/docs/2.1.0/mllib-clustering.html#k-means

Here’ the example contains the following line:

val clusters = KMeans.train(parsedData, numClusters, numIterations)

I was trying to get more information on the train() method.
So I checked out the KMeans Scala API:
https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.clustering.KMeans

The issue is that I couldn’t find the train method…

So I thought I was slowly losing my mind.

I checked out the entire API page… could not find any API docs which describe 
the method train().

I ended up looking at the source code and found the method in the scala source 
code.
(You can see the code here: 
https://github.com/apache/spark/blob/v2.1.0/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
 )

So the method(s) exist, but not covered in the Scala API doc.

How do you raise this as a ‘bug’ ?

Thx

-Mike



Re: Spark Streaming with Confluent

2017-12-13 Thread Gerard Maas
Hi Arkadiusz,

Try 'rooting' your import. It looks like the import is being interpreted as
being relative to the previous.
'rooting; is done by adding the  '_root_'  prefix to your import:

import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.io.confluent.kafka.serializers.KafkaAvroDecoder

kr, Gerard.

On Wed, Dec 13, 2017 at 6:05 PM, Arkadiusz Bicz 
wrote:

> Hi,
>
> I try to test spark streaming 2.2.0 version with confluent 3.3.0
>
> I have got lot of error during compilation this is my sbt:
>
> lazy val sparkstreaming = (project in file("."))
>   .settings(
>   name := "sparkstreaming",
>   organization := "org.arek",
>   version := "0.1-SNAPSHOT",
>   scalaVersion := "2.11.8",
> libraryDependencies ++=  Seq(
>   "org.apache.spark" %% "spark-streaming" % "2.2.0",
>   "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0",
>   "io.confluent" % "kafka-avro-serializer" % "3.3.0"
> )
>   )
>
>
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> import io.confluent.kafka.serializers.KafkaAvroDecoder
>
> object Transformation extends Serializable {
>
>   def main(args: Array[String]) = {
> val conf = new SparkConf().setAppName("StreamingTranformation").
> setMaster("local[*]")
> val streamingContext = new StreamingContext(conf, Seconds(1))
>
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> "local:2181",
>   "schema.registry.url" -> "http://local:8081;,
>   "auto.offset.reset" -> "smallest")
>
> val topicSet = Set("GEXPPROD_ROUTE")
> val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
>
> val lines = messages.foreachRDD(rdd => {
>   rdd.foreach({ avroRecord =>
> println(avroRecord)
>   })
> })
>   }
>
>
> [warn] Found version conflict(s) in library dependencies; some are
> suspected to be binary incompatible:
> [warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
> 3.7.0.Final}
> [warn]  +- org.apache.spark:spark-core_2.11:2.2.0
>  (depends on 3.7.0.Final)
> [warn]  +- org.apache.zookeeper:zookeeper:3.4.8
>  (depends on 3.7.0.Final)
> [warn]  +- org.apache.zookeeper:zookeeper:3.4.6
>  (depends on 3.6.2.Final)
> [warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5
> (depends on 3.6.2.Final)
> [warn]  * commons-net:commons-net:2.2 is selected over 3.1
> [warn]  +- org.apache.spark:spark-core_2.11:2.2.0
>  (depends on 3.1)
> [warn]  +- org.apache.hadoop:hadoop-common:2.6.5
> (depends on 3.1)
> [warn]  * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1}
> [warn]  +- org.apache.hadoop:hadoop-yarn-client:2.6.5
>  (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-api:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-common:2.6.5
>  (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-common:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.curator:curator-framework:2.6.0
>  (depends on 16.0.1)
> [warn]  +- org.apache.curator:curator-client:2.6.0
> (depends on 16.0.1)
> [warn]  +- org.apache.curator:curator-recipes:2.6.0
>  (depends on 16.0.1)
> [warn]  +- org.htrace:htrace-core:3.0.4   (depends
> on 12.0.1)
> [warn] Run 'evicted' to see detailed eviction warnings
> [info] Compiling 1 Scala source to /home/adminuser/data-
> streaming-platform/sparkstreaming/target/scala-2.11/classes ...
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11:
> object confluent is not a member of package org.apache.spark.io
> [error] import io.confluent.kafka.serializers.KafkaAvroDecoder
> [error]   ^
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66:
> not found: type KafkaAvroDecoder
> [error] val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
> [error]  ^
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84:
> not found: type KafkaAvroDecoder
> [error] val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
> [error]
>
>
> When changing to library  "org.apache.spark" %%
> "spark-streaming-kafka-0-10" % "2.2.0" :
>
>
> [warn] 

Spark Streaming with Confluent

2017-12-13 Thread Arkadiusz Bicz
Hi,

I try to test spark streaming 2.2.0 version with confluent 3.3.0

I have got lot of error during compilation this is my sbt:

lazy val sparkstreaming = (project in file("."))
  .settings(
  name := "sparkstreaming",
  organization := "org.arek",
  version := "0.1-SNAPSHOT",
  scalaVersion := "2.11.8",
libraryDependencies ++=  Seq(
  "org.apache.spark" %% "spark-streaming" % "2.2.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0",
  "io.confluent" % "kafka-avro-serializer" % "3.3.0"
)
  )


import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
import io.confluent.kafka.serializers.KafkaAvroDecoder

object Transformation extends Serializable {

  def main(args: Array[String]) = {
val conf = new
SparkConf().setAppName("StreamingTranformation").setMaster("local[*]")
val streamingContext = new StreamingContext(conf, Seconds(1))

val kafkaParams = Map[String, String]("metadata.broker.list" ->
"local:2181",
  "schema.registry.url" -> "http://local:8081;,
  "auto.offset.reset" -> "smallest")

val topicSet = Set("GEXPPROD_ROUTE")
val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)

val lines = messages.foreachRDD(rdd => {
  rdd.foreach({ avroRecord =>
println(avroRecord)
  })
})
  }


[warn] Found version conflict(s) in library dependencies; some are
suspected to be binary incompatible:
[warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
3.7.0.Final}
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0 (depends
on 3.7.0.Final)
[warn]  +- org.apache.zookeeper:zookeeper:3.4.8   (depends
on 3.7.0.Final)
[warn]  +- org.apache.zookeeper:zookeeper:3.4.6   (depends
on 3.6.2.Final)
[warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5(depends
on 3.6.2.Final)
[warn]  * commons-net:commons-net:2.2 is selected over 3.1
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0 (depends
on 3.1)
[warn]  +- org.apache.hadoop:hadoop-common:2.6.5  (depends
on 3.1)
[warn]  * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1}
[warn]  +- org.apache.hadoop:hadoop-yarn-client:2.6.5 (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-api:2.6.5(depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-common:2.6.5 (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5
(depends on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-common:2.6.5  (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5  (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5(depends
on 11.0.2)
[warn]  +- org.apache.curator:curator-framework:2.6.0 (depends
on 16.0.1)
[warn]  +- org.apache.curator:curator-client:2.6.0(depends
on 16.0.1)
[warn]  +- org.apache.curator:curator-recipes:2.6.0   (depends
on 16.0.1)
[warn]  +- org.htrace:htrace-core:3.0.4   (depends
on 12.0.1)
[warn] Run 'evicted' to see detailed eviction warnings
[info] Compiling 1 Scala source to
/home/adminuser/data-streaming-platform/sparkstreaming/target/scala-2.11/classes
...
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11:
object confluent is not a member of package org.apache.spark.io
[error] import io.confluent.kafka.serializers.KafkaAvroDecoder
[error]   ^
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66:
not found: type KafkaAvroDecoder
[error] val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)
[error]  ^
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84:
not found: type KafkaAvroDecoder
[error] val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)
[error]


When changing to library  "org.apache.spark" %%
"spark-streaming-kafka-0-10" % "2.2.0" :


[warn] Found version conflict(s) in library dependencies; some are
suspected to be binary incompatible:
[warn]  * com.101tec:zkclient:0.10 is selected over 0.8
[warn]  +- io.confluent:common-utils:3.3.0(depends
on 0.10)
[warn]  +- org.apache.kafka:kafka_2.11:0.10.0.1   (depends
on 0.8)
[warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
3.7.0.Final}
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0 

Determine Cook's distance / influential data points

2017-12-13 Thread Richard Siebeling
Hi,

would it be possible to determine the Cook's distance using Spark?
thanks,
Richard


Re: Spark loads data from HDFS or S3

2017-12-13 Thread Jörn Franke
S3 can be realized cheaper than HDFS on Amazon.

As you correctly describe it does not support data locality. The data is 
distributed to the workers.

Depending on your use case it can make sense to have HDFS as a temporary 
“cache” for S3 data.

> On 13. Dec 2017, at 09:39, Philip Lee  wrote:
> 
> Hi​
> 
> I have a few of questions about a structure of HDFS and S3 when Spark-like 
> loads data from two storage.
> 
> Generally, when Spark loads data from HDFS, HDFS supports data locality and 
> already own distributed file on datanodes, right? Spark could just process 
> data on workers.
> 
> What about S3? many people in this field use S3 for storage or loading data 
> remotely. When Spark loads data from S3 (sc.textFile('s3://...'), how all 
> data will be spread on Workers? Master node's responsible for this task? It 
> reads all data from S3, then spread the data to Worker? So it migt be a 
> trade-off compared to HDFS? or I got a wrong point of this
> ​.
> ​
> What kind of points in S3 is better than that of HDFS?​
> ​Thanks in Advanced​


Re: Spark loads data from HDFS or S3

2017-12-13 Thread Sebastian Nagel
> When Spark loads data from S3 (sc.textFile('s3://...'), how all data will be 
> spread on Workers?

The data is read by workers. Only make sure that the data is splittable, by 
using a splittable
format or by passing a list of files
 sc.textFile('s3://.../*.txt')
to achieve full parallelism. Otherwise (e.g., if reading a single gzipped file) 
only one worker
will read the data.

> So it migt be a trade-off compared to HDFS?

Accessing data on S3 fromHadoop is usually slower than HDFS, cf.
  
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Other_issues

> What kind of points in S3 is better than that of HDFS?

It's independent from your Hadoop cluster: easier to share, you don't have to
care for the data when maintaining your cluster, ...

Sebastian

On 12/13/2017 09:39 AM, Philip Lee wrote:
> Hi
> ​
> 
> 
> I have a few of questions about a structure of HDFS and S3 when Spark-like 
> loads data from two storage.
> 
> 
> Generally, when Spark loads data from HDFS, HDFS supports data locality and 
> already own distributed
> file on datanodes, right? Spark could just process data on workers.
> 
> 
> What about S3? many people in this field use S3 for storage or loading data 
> remotely. When Spark
> loads data from S3 (sc.textFile('s3://...'), how all data will be spread on 
> Workers? Master node's
> responsible for this task? It reads all data from S3, then spread the data to 
> Worker? So it migt be
> a trade-off compared to HDFS? or I got a wrong point of this
> 
> ​.
> 
> ​
> 
> What kind of points in S3 is better than that of HDFS?
> ​
> 
> ​Thanks in Advanced​
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark loads data from HDFS or S3

2017-12-13 Thread Philip Lee
Hi
​


I have a few of questions about a structure of HDFS and S3 when Spark-like
loads data from two storage.


Generally, when Spark loads data from HDFS, HDFS supports data locality and
already own distributed file on datanodes, right? Spark could just process
data on workers.


What about S3? many people in this field use S3 for storage or loading data
remotely. When Spark loads data from S3 (sc.textFile('s3://...'), how all
data will be spread on Workers? Master node's responsible for this task? It
reads all data from S3, then spread the data to Worker? So it migt be a
trade-off compared to HDFS? or I got a wrong point of this
​.

​

What kind of points in S3 is better than that of HDFS?
​

​Thanks in Advanced​