Re: Do I need to do .collect inside forEachRDD

2017-12-06 Thread kant kodali
@Richard I had pasted the two versions of the code below and I still
couldn't figure out why it wouldn't work without .collect ?  Any help would
be great


*The code below doesn't work and sometime I also run into OutOfMemory
error.*

jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {
Map map = new HashMap<>();
stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

*System.out.println(stringLongTuple2._1()); // Works I can
see values getting printed out*

*System.out.println(stringLongTuple2._2()); // Works I can
see values getting printed out*

map.put(stringLongTuple2._1(), stringLongTuple2._2())

});

*System.out.println(gson.toJson(map));* // Prints empty json
doc string "{}" always. But why? especially

// when the map is getting filled values as confirmed by the
print statements above

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
});

jssc.start();

jssc.awaitTermination();


  VS

*The below code works but it is slow because .collectAsMap*


jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {

LinkedHashMap map = new
LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));

});

jssc.start();

jssc.awaitTermination();




On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas  wrote:

> Hi Kant,
>
> >  but would your answer on .collect() change depending on running the
> spark app in client vs cluster mode?
>
> No, it should make no difference.
>
> -kr, Gerard.
>
> On Tue, Dec 5, 2017 at 11:34 PM, kant kodali  wrote:
>
>> @Richard I don't see any error in the executor log but let me run again
>> to make sure.
>>
>> @Gerard Thanks much!  but would your answer on .collect() change
>> depending on running the spark app in client vs cluster mode?
>>
>> Thanks!
>>
>> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas 
>> wrote:
>>
>>> The general answer to your initial question is that "it depends". If the
>>> operation in the rdd.foreach() closure can be parallelized, then you don't
>>> need to collect first. If it needs some local context (e.g. a socket
>>> connection), then you need to do rdd.collect first to bring the data
>>> locally, which has a perf penalty and also is restricted to the memory size
>>> to the driver process.
>>>
>>> Given the further clarification:
>>> >Reads from Kafka and outputs to Kafka. so I check the output from
>>> Kafka.
>>>
>>> If it's writing to Kafka, that operation can be done in a distributed
>>> form.
>>>
>>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>>>
>>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
>>> migrate to structured streaming by already adopting the 'structured' APIs
>>> within Spark Streaming:
>>>
>>> case class KV(key: String, value: String)
>>>
>>> dstream.map().reduce().forEachRdd{rdd ->
>>> import spark.implicits._
>>> val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs
>>> to be in a (key,value) shape
>>> val dataFrame = rdd.toDF()
>>> dataFrame.write
>>>  .format("kafka")
>>>  .option("kafka.bootstrap.servers",
>>> "host1:port1,host2:port2")
>>>  .option("topic", "topic1")
>>>  .save()
>>> }
>>>
>>> -kr, Gerard.
>>>
>>>
>>>
>>> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:
>>>
 Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

 On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
 richard.q...@capitalone.com> wrote:

> Where do you check the output result for both case?
>
> Sent from my iPhone
>
>
> > On Dec 5, 2017, at 15:36, kant kodali  wrote:
> >
> > Hi All,
> >
> > I have a simple stateless transformation using Dstreams 

Spark ListenerBus

2017-12-06 Thread KhajaAsmath Mohammed
Hi,

I am running spark sql job and it completes without any issues. I am
getting errors as
ERROR: SparkListenerBus has already stopped! Dropping event
SparkListenerExecutorMetricsUpdate after completion of job. could anyone
share your suggestions on how to avoid it.

Thanks,
Asmath


Re: Json Parsing.

2017-12-06 Thread satyajit vegesna
Thank you for the info, is there a way to get all keys of JSON, so that i
can create a dataframe with json keys, as below,

  fieldsDataframe.withColumn("data" ,
functions.get_json_object($"RecordString", "$.id"))   this is for appending
a single column in dataframe with id key.
I would like to automate this process for all keys in the JSON, as i am
going to get dynamically generated JSON schema.

On Wed, Dec 6, 2017 at 4:37 PM, ayan guha  wrote:

>
> On Thu, 7 Dec 2017 at 11:37 am, ayan guha  wrote:
>
>> You can use get_json function
>>
>> On Thu, 7 Dec 2017 at 10:39 am, satyajit vegesna <
>> satyajit.apas...@gmail.com> wrote:
>>
>>> Does spark support automatic detection of schema from a json string in a
>>> dataframe.
>>>
>>> I am trying to parse a json string and do some transofrmations on to it
>>> (would like append new columns to the dataframe) , from the data i stream
>>> from kafka.
>>>
>>> But i am not very sure, how i can parse the json in structured
>>> streaming. And i would not be interested in creating a schema, as the data
>>> form kafka is going to maintain different schema objects in value column.
>>>
>>> Any advice or help would be appreciated.
>>>
>>> Regards,
>>> Satyajit.
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
> --
> Best Regards,
> Ayan Guha
>


Re: Json Parsing.

2017-12-06 Thread ayan guha
You can use get

On Thu, 7 Dec 2017 at 10:39 am, satyajit vegesna 
wrote:

> Does spark support automatic detection of schema from a json string in a
> dataframe.
>
> I am trying to parse a json string and do some transofrmations on to it
> (would like append new columns to the dataframe) , from the data i stream
> from kafka.
>
> But i am not very sure, how i can parse the json in structured streaming.
> And i would not be interested in creating a schema, as the data form kafka
> is going to maintain different schema objects in value column.
>
> Any advice or help would be appreciated.
>
> Regards,
> Satyajit.
>
-- 
Best Regards,
Ayan Guha


Re: Json Parsing.

2017-12-06 Thread ayan guha
On Thu, 7 Dec 2017 at 11:37 am, ayan guha  wrote:

> You can use get_json function
>
> On Thu, 7 Dec 2017 at 10:39 am, satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>> Does spark support automatic detection of schema from a json string in a
>> dataframe.
>>
>> I am trying to parse a json string and do some transofrmations on to it
>> (would like append new columns to the dataframe) , from the data i stream
>> from kafka.
>>
>> But i am not very sure, how i can parse the json in structured streaming.
>> And i would not be interested in creating a schema, as the data form kafka
>> is going to maintain different schema objects in value column.
>>
>> Any advice or help would be appreciated.
>>
>> Regards,
>> Satyajit.
>>
> --
> Best Regards,
> Ayan Guha
>
-- 
Best Regards,
Ayan Guha


Json Parsing.

2017-12-06 Thread satyajit vegesna
Does spark support automatic detection of schema from a json string in a
dataframe.

I am trying to parse a json string and do some transofrmations on to it
(would like append new columns to the dataframe) , from the data i stream
from kafka.

But i am not very sure, how i can parse the json in structured streaming.
And i would not be interested in creating a schema, as the data form kafka
is going to maintain different schema objects in value column.

Any advice or help would be appreciated.

Regards,
Satyajit.


Explode schema name question

2017-12-06 Thread tj5527
Searching online with key word such as auto explode schema doesn't come up what 
I am looking for, so ask here ...

I want to explode Dataset schema where the dataset schema are nested structure 
from complicated XML and its structure changes a lot with high frequency. After 
checking api doc, I notice I can obtain schema through Dataset.schema(), that 
will return StructType. However, before starting to manually extract schema for 
explode by traversing StructType tree. I would like to check if any better way 
to obtain the full namespace of the schema?

I appreciate any advises. Thanks.

Re: Spark job only starts tasks on a single node

2017-12-06 Thread Ji Yan
I am sure that the other agents have plentiful enough resources, but I
don't know why Spark only scheduled executors on one single node, up to
that node's capacity ( it is a different node everytime I run btw ).

I checked the DEBUG log from Spark Driver, didn't see any mention of
decline. But from log, it looks like it has only accepted one offer from
Mesos.

Also looks like there is no special role required on Spark part!

On Wed, Dec 6, 2017 at 5:57 AM, Art Rand  wrote:

> Hello Ji,
>
> Spark will launch Executors round-robin on offers, so when the resources
> on an agent get broken into multiple resource offers it's possible that
> many Executrors get placed on a single agent. However, from your
> description, it's not clear why your other agents do not get Executors
> scheduled on them. It's possible that the offers from your other agents are
> insufficient in some way. The Mesos MASTER log should show offers being
> declined by your Spark Driver, do you see that?  If you have DEBUG level
> logging in your Spark driver you should also see offers being declined
> 
> there. Finally if your Spark framework isn't receiving any resource offers,
> it could be because of the roles you have established on your agents or
> quota set on other frameworks, have you set up any of that? Hope this helps!
>
> Art
>
> On Tue, Dec 5, 2017 at 10:45 PM, Ji Yan  wrote:
>
>> Hi all,
>>
>> I am running Spark 2.0 on Mesos 1.1. I was trying to split up my job onto
>> several nodes. I try to set the number of executors by the formula
>> (spark.cores.max / spark.executor.cores). The behavior I saw was that Spark
>> will try to fill up on one mesos node as many executors as it can, then it
>> stops going to other mesos nodes despite that it has not done scheduling
>> all the executors I have asked it to yet! This is super weird!
>>
>> Did anyone notice this behavior before? Any help appreciated!
>>
>> Ji
>>
>> The information in this email is confidential and may be legally
>> privileged. It is intended solely for the addressee. Access to this email
>> by anyone else is unauthorized. If you are not the intended recipient, any
>> disclosure, copying, distribution or any action taken or omitted to be
>> taken in reliance on it, is prohibited and may be unlawful.
>>
>
>

-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.


sparkSession.sql("sql query") vs df.sqlContext().sql(this.query) ?

2017-12-06 Thread kant kodali
Hi All,

I have the following snippets of the code and I wonder what is the
difference between these two and which one should I use? I am using spark
2.2.

Dataset df = sparkSession.readStream()
.format("kafka")
.load();

df.createOrReplaceTempView("table");
df.printSchema();

*Dataset resultSet =  df.sqlContext().sql(*
*"select value from table");
//sparkSession.sql(this.query);*StreamingQuery streamingQuery =
resultSet
.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.format("console")
.start();


vs


Dataset df = sparkSession.readStream()
.format("kafka")
.load();

df.createOrReplaceTempView("table");

*Dataset resultSet =  sparkSession.sql(*
*"select value from table");
//sparkSession.sql(this.query);*StreamingQuery streamingQuery =
resultSet
.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.format("console")
.start();


Thanks!


Buffer/cache exhaustion Spark standalone inside a Docker container

2017-12-06 Thread Stein Welberg
Hi All!

I have a very weird memory issue (which is what a lot of people will most 
likely say ;-)) with Spark running in standalone mode inside a Docker 
container. Our setup is as follows: We have a Docker container in which we have 
a Spring boot application that runs Spark in standalone mode. This Spring boot 
app also contains a few scheduled tasks. These tasks trigger Spark jobs. The 
Spark jobs scrape a SQL database shuffles the data a bit and then writes the 
results to a different SQL table. Our current data set is very small (the 
largest table contains a few million rows).

The problem is that the Docker host (a CentOS VM) that runs the Docker 
container crashes after a while because the memory gets exhausted. I currently 
have limited the Spark memory usage to 512M (I have set both executor and 
driver memory) and in the Spark UI I can see that the largest job only takes 
about 10 MB of memory.

After digging a bit further I noticed that Spark eats up all the buffer / cache 
memory on the machine. After clearing this manually by forcing Linux to drop 
caches (echo 2 > /proc/sys/vm/drop_caches) (clearing the dentries and inodes) 
the cache usage drops considerably but if I don't keep doing this regularly I 
see that the cache usage slowly keeps going up until all memory is used in 
buffer/cache.

Does anyone have an idea what I might be doing wrong / what is going on here?

Big thanks in advance for any help!

Regards,
Stein Welberg


signature.asc
Description: Message signed with OpenPGP


A possible bug? Must call persist to make code run

2017-12-06 Thread kwunlyou
I prepare a simple example (python) as follows to illustrate what I found:

- The code works well by calling a persist beforehand under all Spark
versions

- Without calling persist, the code works well under Spark 2.2.0 but doesn't
work under Spark 2.1.1 and Spark 2.1.2

- It really looks like a bug in Spark. Does anyone know which solved Spark
issues are related?


== CODE ==
from __future__ import absolute_import, division, print_function
import pyspark.sql.types as T
import pyspark.sql.functions as F

# 2.1.1, 2.1.2 doesn't work
# 2.2.0 works
print(spark.version)

df = spark.createDataFrame(
[{'name': 'a', 'scores': ['1', '2']}, {'name': 'b', 'scores': None}],
T.StructType(
[T.StructField('name', T.StringType(), True),
T.StructField('scores', T.ArrayType(T.StringType()), True)]
)
)

print(df.collect())
df.printSchema()

def loop_array(l):
for e in l:
pass
return "pass"


# should work with persist
# tmp = df.filter(F.col('scores').isNotNull()).withColumn(
# 'new_col',
# F.udf(loop_array)('scores')
# ).persist()

# won't work
tmp = df.filter(F.col('scores').isNotNull()).withColumn(
'new_col',
F.udf(loop_array)('scores')
)

print(tmp.collect())
tmp.filter(F.col('new_col').isNotNull()).count()
 CODE END 

== ERROR MESSAGE ==
---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 tmp.filter(F.col('new_col').isNotNull()).count()

/databricks/spark/python/pyspark/sql/dataframe.py in count(self)
378 2
379 """
--> 380 return int(self._jdf.count())
381
382 @ignore_unicode_prefix

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in
__call__(self, *args)
   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
   1134
   1135 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling o235.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 2.0 failed 4 times, most recent failure: Lost task 3.3 in stage 2.0
(TID 14, 10.179.231.249, executor 0):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/databricks/spark/python/pyspark/worker.py", line 171, in main
process()
  File "/databricks/spark/python/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/worker.py", line 103, in 
func = lambda _, it: map(mapper, it)
  File "", line 1, in 
  File "/databricks/spark/python/pyspark/worker.py", line 70, in 
return lambda *a: f(*a)
  File "", line 2, in loop_array
TypeError: 'NoneType' object is not iterable

at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at

Re: Spark job only starts tasks on a single node

2017-12-06 Thread Art Rand
Hello Ji,

Spark will launch Executors round-robin on offers, so when the resources on
an agent get broken into multiple resource offers it's possible that many
Executrors get placed on a single agent. However, from your description,
it's not clear why your other agents do not get Executors scheduled on
them. It's possible that the offers from your other agents are insufficient
in some way. The Mesos MASTER log should show offers being declined by your
Spark Driver, do you see that?  If you have DEBUG level logging in your
Spark driver you should also see offers being declined

there. Finally if your Spark framework isn't receiving any resource offers,
it could be because of the roles you have established on your agents or
quota set on other frameworks, have you set up any of that? Hope this helps!

Art

On Tue, Dec 5, 2017 at 10:45 PM, Ji Yan  wrote:

> Hi all,
>
> I am running Spark 2.0 on Mesos 1.1. I was trying to split up my job onto
> several nodes. I try to set the number of executors by the formula
> (spark.cores.max / spark.executor.cores). The behavior I saw was that Spark
> will try to fill up on one mesos node as many executors as it can, then it
> stops going to other mesos nodes despite that it has not done scheduling
> all the executors I have asked it to yet! This is super weird!
>
> Did anyone notice this behavior before? Any help appreciated!
>
> Ji
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>


[ML] LogisticRegression and dataset's standardization before training

2017-12-06 Thread Filipp Zhinkin
Hi,

LogisticAggregator [1] scales every sample on every iteration. Without
scaling binaryUpdateInPlace could be rewritten using BLAS.dot and that
would significantly improve performance.
However, there is a comment [2] saying that standardization and
caching of the dataset before training will "create a lot of
overhead".

What kind of overhead it is all about and what is rationale to avoid
scaling dataset prior training?

Thanks,
Filipp.

[1] 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala#L229
[2] 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala#L40

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: unable to connect to connect to cluster 2.2.0

2017-12-06 Thread Imran Rajjad
thanks

the machine where spark job was being submitted had SPARK_HOME pointing old
2.1.1 directory.



On Wed, Dec 6, 2017 at 1:35 PM, Qiao, Richard 
wrote:

> Are you now building your app using spark 2.2 or 2.1?
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> *From: *Imran Rajjad 
> *Date: *Wednesday, December 6, 2017 at 2:45 AM
> *To: *"user @spark" 
> *Subject: *unable to connect to connect to cluster 2.2.0
>
>
>
> Hi,
>
>
>
> Recently upgraded from 2.1.1 to 2.2.0. My Streaming job seems to have
> broken. The submitted application is unable to connect to the cluster, when
> all is running.
>
>
>
> below is my stack trace
>
> Spark Master:spark://192.168.10.207:7077
> Job Arguments:
> -appName orange_watch -directory /u01/watch/stream/
> Spark Configuration:
> [spark.executor.memory, spark.driver.memory, spark.app.name,
> spark.executor.cores]:6g
> [spark.executor.memory, spark.driver.memory, spark.app.name,
> spark.executor.cores]:4g
> [spark.executor.memory, spark.driver.memory, spark.app.name,
> spark.executor.cores]:orange_watch
> [spark.executor.memory, spark.driver.memory, spark.app.name,
> spark.executor.cores]:2
>
> Spark Arguments:
> [--packages]:graphframes:graphframes:0.5.0-spark2.1-s_2.11
>
> Using properties file: /home/my_user/spark-2.2.0-bin-
> hadoop2.7/conf/spark-defaults.conf
> Adding default property: spark.jars.packages=
> graphframes:graphframes:0.5.0-spark2.1-s_2.11
> Parsed arguments:
>   master  spark://192.168.10.207:7077
>   deployMode  null
>   executorMemory  6g
>   executorCores   2
>   totalExecutorCores  null
>   propertiesFile  /home/my_user/spark-2.2.0-bin-
> hadoop2.7/conf/spark-defaults.conf
>   driverMemory4g
>   driverCores null
>   driverExtraClassPathnull
>   driverExtraLibraryPath  null
>   driverExtraJavaOptions  null
>   supervise   false
>   queue   null
>   numExecutorsnull
>   files   null
>   pyFiles null
>   archivesnull
>   mainClass   com.my_user.MainClassWatch
>   primaryResource file:/home/my_user/cluster-testing/job.jar
>   nameorange_watch
>   childArgs   [-watchId 3199 -appName orange_watch -directory
> /u01/watch/stream/]
>   jarsnull
>   packagesgraphframes:graphframes:0.5.0-spark2.1-s_2.11
>   packagesExclusions  null
>   repositoriesnull
>   verbose true
>
> Spark properties used, including those specified through
>  --conf and those from the properties file /home/my_user/spark-2.2.0-bin-
> hadoop2.7/conf/spark-defaults.conf:
>   (spark.driver.memory,4g)
>   (spark.executor.memory,6g)
>   (spark.jars.packages,graphframes:graphframes:0.5.0-spark2.1-s_2.11)
>   (spark.app.name,orange_watch)
>   (spark.executor.cores,2)
>
>
> Ivy Default Cache set to: /home/my_user/.ivy2/cache
> The jars for the packages stored in: /home/my_user/.ivy2/jars
> :: loading settings :: url = jar:file:/home/my_user/spark-
> 2.2.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/
> core/settings/ivysettings.xml
> graphframes#graphframes added as a dependency
> :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
> confs: [default]
> found graphframes#graphframes;0.5.0-spark2.1-s_2.11 in spark-list
> found com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 in
> central
> found com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2
> in central
> found org.scala-lang#scala-reflect;2.11.0 in central
> found org.slf4j#slf4j-api;1.7.7 in spark-list
> :: resolution report :: resolve 191ms :: artifacts dl 5ms
> :: modules in use:
> com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 from
> central in [default]
> com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2 from
> central in [default]
> graphframes#graphframes;0.5.0-spark2.1-s_2.11 from spark-list in
> [default]
> org.scala-lang#scala-reflect;2.11.0 from central in [default]
> org.slf4j#slf4j-api;1.7.7 from spark-list in [default]
> 
> -
> |  |modules||
> artifacts   |
> |   conf   | number| search|dwnlded|evicted||
> number|dwnlded|
> 
> -
> |  default |   5   |   0   |   0   |   0   ||   5   |
> 0   |
> 
> -
> :: retrieving :: org.apache.spark#spark-submit-parent
> confs: [default]
> 0 artifacts copied, 5 already retrieved (0kB/7ms)
> Main class:
> com.my_user.MainClassWatch
> Arguments:
> -watchId
> 3199
> -appName
> 

Re: How to export the Spark SQL jobs from the HiveThriftServer2

2017-12-06 Thread wenxing zheng
the words: [app-id] will actually be [base-app-id]/[attempt-id], where
[base-app-id] is the YARN application ID. is not so correct. As after I
changed the [app-id] to [base-app-id], it works.

Maybe we need to fix the document?

>From the information of the spark job or the spark stages, I can't see the
statistics on the memory part? Appreciated for any hints

On Wed, Dec 6, 2017 at 2:08 PM, wenxing zheng 
wrote:

> Dear all,
>
> I have a HiveThriftServer2 serer running and most of our spark SQLs will
> go there for calculation. From the Yarn GUI, I can see the application id
> and the attempt ID of the thrift server. But with the REST api described on
> the page (https://spark.apache.org/docs/latest/monitoring.html#rest-api),
> I still can't get the jobs for a given application with the endpoint:
> */applications/[app-id]/jobs*
>
> Can anyone kindly advice how to dump the spark SQL jobs for audit? Just
> like the one for the MapReduce jobs (https://hadoop.apache.org/
> docs/current/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html).
>
> Thanks again,
> Wenxing
>


Re: Do I need to do .collect inside forEachRDD

2017-12-06 Thread Gerard Maas
Hi Kant,

>  but would your answer on .collect() change depending on running the
spark app in client vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali  wrote:

> @Richard I don't see any error in the executor log but let me run again to
> make sure.
>
> @Gerard Thanks much!  but would your answer on .collect() change depending
> on running the spark app in client vs cluster mode?
>
> Thanks!
>
> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas  wrote:
>
>> The general answer to your initial question is that "it depends". If the
>> operation in the rdd.foreach() closure can be parallelized, then you don't
>> need to collect first. If it needs some local context (e.g. a socket
>> connection), then you need to do rdd.collect first to bring the data
>> locally, which has a perf penalty and also is restricted to the memory size
>> to the driver process.
>>
>> Given the further clarification:
>> >Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>
>> If it's writing to Kafka, that operation can be done in a distributed
>> form.
>>
>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>>
>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
>> migrate to structured streaming by already adopting the 'structured' APIs
>> within Spark Streaming:
>>
>> case class KV(key: String, value: String)
>>
>> dstream.map().reduce().forEachRdd{rdd ->
>> import spark.implicits._
>> val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
>> be in a (key,value) shape
>> val dataFrame = rdd.toDF()
>> dataFrame.write
>>  .format("kafka")
>>  .option("kafka.bootstrap.servers",
>> "host1:port1,host2:port2")
>>  .option("topic", "topic1")
>>  .save()
>> }
>>
>> -kr, Gerard.
>>
>>
>>
>> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:
>>
>>> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>>
>>> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
>>> richard.q...@capitalone.com> wrote:
>>>
 Where do you check the output result for both case?

 Sent from my iPhone


 > On Dec 5, 2017, at 15:36, kant kodali  wrote:
 >
 > Hi All,
 >
 > I have a simple stateless transformation using Dstreams (stuck with
 the old API for one of the Application). The pseudo code is rough like this
 >
 > dstream.map().reduce().forEachRdd(rdd -> {
 >  rdd.collect(),forEach(); // Is this necessary ? Does execute
 fine but a bit slow
 > })
 >
 > I understand collect collects the results back to the driver but is
 that necessary? can I just do something like below? I believe I tried both
 and somehow the below code didn't output any results (It can be issues with
 my env. I am not entirely sure) but I just would like some clarification on
 .collect() since it seems to slow things down for me.
 >
 > dstream.map().reduce().forEachRdd(rdd -> {
 >  rdd.forEach(() -> {} ); //
 > })
 >
 > Thanks!
 >
 >
 

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.


>>>
>>
>


Re: unable to connect to connect to cluster 2.2.0

2017-12-06 Thread Qiao, Richard
Are you now building your app using spark 2.2 or 2.1?

Best Regards
Richard


From: Imran Rajjad 
Date: Wednesday, December 6, 2017 at 2:45 AM
To: "user @spark" 
Subject: unable to connect to connect to cluster 2.2.0

Hi,

Recently upgraded from 2.1.1 to 2.2.0. My Streaming job seems to have broken. 
The submitted application is unable to connect to the cluster, when all is 
running.

below is my stack trace
Spark Master:spark://192.168.10.207:7077
Job Arguments:
-appName orange_watch -directory /u01/watch/stream/
Spark Configuration:
[spark.executor.memory, spark.driver.memory, 
spark.app.name, spark.executor.cores]:6g
[spark.executor.memory, spark.driver.memory, 
spark.app.name, spark.executor.cores]:4g
[spark.executor.memory, spark.driver.memory, 
spark.app.name, spark.executor.cores]:orange_watch
[spark.executor.memory, spark.driver.memory, 
spark.app.name, spark.executor.cores]:2
Spark Arguments:
[--packages]:graphframes:graphframes:0.5.0-spark2.1-s_2.11
Using properties file: 
/home/my_user/spark-2.2.0-bin-hadoop2.7/conf/spark-defaults.conf
Adding default property: 
spark.jars.packages=graphframes:graphframes:0.5.0-spark2.1-s_2.11
Parsed arguments:
  master  
spark://192.168.10.207:7077
  deployMode  null
  executorMemory  6g
  executorCores   2
  totalExecutorCores  null
  propertiesFile  
/home/my_user/spark-2.2.0-bin-hadoop2.7/conf/spark-defaults.conf
  driverMemory4g
  driverCores null
  driverExtraClassPathnull
  driverExtraLibraryPath  null
  driverExtraJavaOptions  null
  supervise   false
  queue   null
  numExecutorsnull
  files   null
  pyFiles null
  archivesnull
  mainClass   com.my_user.MainClassWatch
  primaryResource file:/home/my_user/cluster-testing/job.jar
  nameorange_watch
  childArgs   [-watchId 3199 -appName orange_watch -directory 
/u01/watch/stream/]
  jarsnull
  packagesgraphframes:graphframes:0.5.0-spark2.1-s_2.11
  packagesExclusions  null
  repositoriesnull
  verbose true
Spark properties used, including those specified through
 --conf and those from the properties file 
/home/my_user/spark-2.2.0-bin-hadoop2.7/conf/spark-defaults.conf:
  (spark.driver.memory,4g)
  (spark.executor.memory,6g)
  (spark.jars.packages,graphframes:graphframes:0.5.0-spark2.1-s_2.11)
  (spark.app.name,orange_watch)
  (spark.executor.cores,2)

Ivy Default Cache set to: /home/my_user/.ivy2/cache
The jars for the packages stored in: /home/my_user/.ivy2/jars
:: loading settings :: url = 
jar:file:/home/my_user/spark-2.2.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found graphframes#graphframes;0.5.0-spark2.1-s_2.11 in spark-list
found com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 in central
found com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2 in 
central
found org.scala-lang#scala-reflect;2.11.0 in central
found org.slf4j#slf4j-api;1.7.7 in spark-list
:: resolution report :: resolve 191ms :: artifacts dl 5ms
:: modules in use:
com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 from central in 
[default]
com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2 from central 
in [default]
graphframes#graphframes;0.5.0-spark2.1-s_2.11 from spark-list in 
[default]
org.scala-lang#scala-reflect;2.11.0 from central in [default]
org.slf4j#slf4j-api;1.7.7 from spark-list in [default]
-
|  |modules||   artifacts   |
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|
-
|  default |   5   |   0   |   0   |   0   ||   5   |   0   |
-
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 5 already retrieved (0kB/7ms)
Main class:
com.my_user.MainClassWatch
Arguments:
-watchId
3199
-appName
orange_watch
-directory
/u01/watch/stream/
System properties:
(spark.executor.memory,6g)
(spark.driver.memory,4g)
(SPARK_SUBMIT,true)
(spark.jars.packages,graphframes:graphframes:0.5.0-spark2.1-s_2.11)
(spark.app.name,orange_watch)