Re: Programmatically get status of job (WAITING/RUNNING)

2017-12-07 Thread Qiao, Richard
For your question of example, the answer is yes.
“For example, if an application wanted 4 executors
(spark.executor.instances=4) but the spark cluster can only provide 1
executor. This means that I will only receive 1 onExecutorAdded event. Will
the application state change to RUNNING (even if 1 executor was allocated)?
“

Best Regards
Richard


On 12/7/17, 2:40 PM, "bsikander"  wrote:

Marcelo Vanzin wrote
> I'm not sure I follow you here. This is something that you are
> defining, not Spark.

Yes, you are right. In my code, 
1) my notion of RUNNING is that both driver + executors are in RUNNING
state.
2) my notion of WAITING is if any one of driver/executor is in WAITING
state.

So,
- SparkLauncher provides me the details about the "driver".
RUNNING/SUBMITTED/WAITING
- SparkListener provides me the details about the "executor" using
onExecutorAdded/onExecutorDeleted

I want to combine both SparkLauncher + SparkListener to achieve my view of
RUNNING/WAITING.

The only thing confusing me here is that I don't know how Spark internally
converts applications from WAITING to RUNNING state.
For example, if an application wanted 4 executors
(spark.executor.instances=4) but the spark cluster can only provide 1
executor. This means that I will only receive 1 onExecutorAdded event. Will
the application state change to RUNNING (even if 1 executor was allocated)?

If I am clear on this logic I can implement my feature.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Programmatically get status of job (WAITING/RUNNING)

2017-12-07 Thread Qiao, Richard
For #2, do you mean “RUNNING” showing in “Driver” table? If yes, that is not a 
problem, because driver does run, while there is no executor available, as can 
be a status for you to catch – Driver running while no executors.
Comparing #1 and #3, my understanding of “submitted” is “the jar is submitted 
to executors”. With this concept, you may define your own status.

Best Regards
Richard


On 12/4/17, 4:06 AM, "bsikander"  wrote:

So, I tried to use SparkAppHandle.Listener with SparkLauncher as you
suggested. The behavior of Launcher is not what I expected.

1- If I start the job (using SparkLauncher) and my Spark cluster has enough
cores available, I receive events in my class extending
SparkAppHandle.Listener and I see the status getting changed from
UNKOWN->CONNECTED -> SUBMITTED -> RUNNING. All good here.

2- If my Spark cluster has cores only for my Driver process (running in
cluster mode) but no cores for my executor, then I still receive the RUNNING
event. I was expecting something else since my executor has no cores and
Master UI shows WAITING state for executors, listener should respond with
SUBMITTED state instead of RUNNING.

3- If my Spark cluster has no cores for even the driver process then
SparkLauncher invokes no events at all. The state stays in UNKNOWN. I would
have expected it to be in SUBMITTED state atleast.

*Is there any way with which I can reliably get the WAITING state of job?*
Driver=RUNNING, executor=RUNNING, overall state should be RUNNING
Driver=RUNNING, executor=WAITING overall state should be SUBMITTED/WAITING
Driver=WAITING, executor=WAITING overall state should be
CONNECTED/SUBMITTED/WAITING







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Do I need to do .collect inside forEachRDD

2017-12-07 Thread Qiao, Richard
Kant, right, we cannot use Driver’s producer in executor. That’s I mentioned 
“kafka sink” to solve it.
This article should be helpful about it 
https://allegro.tech/2015/08/spark-kafka-integration.html

Best Regards
Richard


From: kant kodali <kanth...@gmail.com>
Date: Thursday, December 7, 2017 at 12:39 PM
To: "Qiao, Richard" <richard.q...@capitalone.com>
Cc: Gerard Maas <gerard.m...@gmail.com>, "user @spark" <user@spark.apache.org>
Subject: Re: Do I need to do .collect inside forEachRDD

Hi Richard,

I had tried your sample code now and several times in the past as well. The 
problem seems to be kafkaProducer is not serializable. so I get "Task not 
serializable exception" and my kafkaProducer object is created using the 
following jar.

group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1'

On Thu, Dec 7, 2017 at 2:46 AM, Qiao, Richard 
<richard.q...@capitalone.com<mailto:richard.q...@capitalone.com>> wrote:
Thanks for sharing the code.
The 1st problem in the first code is the map is allocated in Driver, but it’s 
trying to put data in Executors, then retrieve it in driver to send to Kafka.
You are using this map as accumulator’s feature, but it doesn’t work in this 
way.

The 2nd problem is both codes are trying to collect rdd level data to generate 
a single Json string then send to Kafka, which could cause very long json 
string if your TPS is very high.
If possible you can send smaller json strings at task level, such as:
.foreachRDD(stringLongJavaPairRDD -> {
  stringLongJavaPairRDD.foreachPartition{partition ->{
  Map<String, Long> map = new HashMap<>(); //Defined in a task
  partition.foreach(stringLongTuple2 -> {
map.put(stringLongTuple2._1(), stringLongTuple2._2())
  });
  producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); // 
send smaller json in a task
}
  }
});
When you do it, make sure kafka producer (seek kafka sink for it) and gson’s 
environment setup correctly in executors.

If after this, there is still OOM, let’s discuss further.

Best Regards
Richard


From: kant kodali <kanth...@gmail.com<mailto:kanth...@gmail.com>>
Date: Thursday, December 7, 2017 at 2:30 AM
To: Gerard Maas <gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>>
Cc: "Qiao, Richard" 
<richard.q...@capitalone.com<mailto:richard.q...@capitalone.com>>, "user 
@spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Do I need to do .collect inside forEachRDD

@Richard I had pasted the two versions of the code below and I still couldn't 
figure out why it wouldn't work without .collect ?  Any help would be great


The code below doesn't work and sometime I also run into OutOfMemory error.

jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {
Map<String, Long> map = new HashMap<>();
stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

System.out.println(stringLongTuple2._1()); // Works I can see 
values getting printed out

System.out.println(stringLongTuple2._2()); // Works I can see 
values getting printed out

map.put(stringLongTuple2._1(), stringLongTuple2._2())

});

System.out.println(gson.toJson(map)); // Prints empty json doc string 
"{}" always. But why? especially

// when the map is getting filled values as confirmed by the print 
statements above

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
});

jssc.start();

jssc.awaitTermination();



  VS

The below code works but it is slow because .collectAsMap



jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {

LinkedHashMap<String, Long&

Re: Do I need to do .collect inside forEachRDD

2017-12-07 Thread Qiao, Richard
Thanks for sharing the code.
The 1st problem in the first code is the map is allocated in Driver, but it’s 
trying to put data in Executors, then retrieve it in driver to send to Kafka.
You are using this map as accumulator’s feature, but it doesn’t work in this 
way.

The 2nd problem is both codes are trying to collect rdd level data to generate 
a single Json string then send to Kafka, which could cause very long json 
string if your TPS is very high.
If possible you can send smaller json strings at task level, such as:
.foreachRDD(stringLongJavaPairRDD -> {
  stringLongJavaPairRDD.foreachPartition{partition ->{
  Map<String, Long> map = new HashMap<>(); //Defined in a task
  partition.foreach(stringLongTuple2 -> {
map.put(stringLongTuple2._1(), stringLongTuple2._2())
  });
  producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); // 
send smaller json in a task
}
  }
});
When you do it, make sure kafka producer (seek kafka sink for it) and gson’s 
environment setup correctly in executors.

If after this, there is still OOM, let’s discuss further.

Best Regards
Richard


From: kant kodali <kanth...@gmail.com>
Date: Thursday, December 7, 2017 at 2:30 AM
To: Gerard Maas <gerard.m...@gmail.com>
Cc: "Qiao, Richard" <richard.q...@capitalone.com>, "user @spark" 
<user@spark.apache.org>
Subject: Re: Do I need to do .collect inside forEachRDD

@Richard I had pasted the two versions of the code below and I still couldn't 
figure out why it wouldn't work without .collect ?  Any help would be great


The code below doesn't work and sometime I also run into OutOfMemory error.

jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {
Map<String, Long> map = new HashMap<>();
stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

System.out.println(stringLongTuple2._1()); // Works I can see 
values getting printed out

System.out.println(stringLongTuple2._2()); // Works I can see 
values getting printed out

map.put(stringLongTuple2._1(), stringLongTuple2._2())

});

System.out.println(gson.toJson(map)); // Prints empty json doc string 
"{}" always. But why? especially

// when the map is getting filled values as confirmed by the print 
statements above

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
});

jssc.start();

jssc.awaitTermination();



  VS

The below code works but it is slow because .collectAsMap



jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {

LinkedHashMap<String, Long> map = new 
LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));

});

jssc.start();

jssc.awaitTermination();





On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas 
<gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>> wrote:
Hi Kant,

>  but would your answer on .collect() change depending on running the spark 
> app in client vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali 
<kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
@Richard I don't see any error in the executor log but let me run again to make 
sure.

@Gerard Thanks much!  but would your answer on .collect() change depending on 
running the spark app in client vs cluster mode?

Thanks!

On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas 
<gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>> wrote:
The general answer to your initial question is that "it depends". If the 
operation in the rdd.foreach() closure can be parallelized, then you don't need 
to collect

Re: unable to connect to connect to cluster 2.2.0

2017-12-06 Thread Qiao, Richard
Are you now building your app using spark 2.2 or 2.1?

Best Regards
Richard


From: Imran Rajjad 
Date: Wednesday, December 6, 2017 at 2:45 AM
To: "user @spark" 
Subject: unable to connect to connect to cluster 2.2.0

Hi,

Recently upgraded from 2.1.1 to 2.2.0. My Streaming job seems to have broken. 
The submitted application is unable to connect to the cluster, when all is 
running.

below is my stack trace
Spark Master:spark://192.168.10.207:7077
Job Arguments:
-appName orange_watch -directory /u01/watch/stream/
Spark Configuration:
[spark.executor.memory, spark.driver.memory, 
spark.app.name, spark.executor.cores]:6g
[spark.executor.memory, spark.driver.memory, 
spark.app.name, spark.executor.cores]:4g
[spark.executor.memory, spark.driver.memory, 
spark.app.name, spark.executor.cores]:orange_watch
[spark.executor.memory, spark.driver.memory, 
spark.app.name, spark.executor.cores]:2
Spark Arguments:
[--packages]:graphframes:graphframes:0.5.0-spark2.1-s_2.11
Using properties file: 
/home/my_user/spark-2.2.0-bin-hadoop2.7/conf/spark-defaults.conf
Adding default property: 
spark.jars.packages=graphframes:graphframes:0.5.0-spark2.1-s_2.11
Parsed arguments:
  master  
spark://192.168.10.207:7077
  deployMode  null
  executorMemory  6g
  executorCores   2
  totalExecutorCores  null
  propertiesFile  
/home/my_user/spark-2.2.0-bin-hadoop2.7/conf/spark-defaults.conf
  driverMemory4g
  driverCores null
  driverExtraClassPathnull
  driverExtraLibraryPath  null
  driverExtraJavaOptions  null
  supervise   false
  queue   null
  numExecutorsnull
  files   null
  pyFiles null
  archivesnull
  mainClass   com.my_user.MainClassWatch
  primaryResource file:/home/my_user/cluster-testing/job.jar
  nameorange_watch
  childArgs   [-watchId 3199 -appName orange_watch -directory 
/u01/watch/stream/]
  jarsnull
  packagesgraphframes:graphframes:0.5.0-spark2.1-s_2.11
  packagesExclusions  null
  repositoriesnull
  verbose true
Spark properties used, including those specified through
 --conf and those from the properties file 
/home/my_user/spark-2.2.0-bin-hadoop2.7/conf/spark-defaults.conf:
  (spark.driver.memory,4g)
  (spark.executor.memory,6g)
  (spark.jars.packages,graphframes:graphframes:0.5.0-spark2.1-s_2.11)
  (spark.app.name,orange_watch)
  (spark.executor.cores,2)

Ivy Default Cache set to: /home/my_user/.ivy2/cache
The jars for the packages stored in: /home/my_user/.ivy2/jars
:: loading settings :: url = 
jar:file:/home/my_user/spark-2.2.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found graphframes#graphframes;0.5.0-spark2.1-s_2.11 in spark-list
found com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 in central
found com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2 in 
central
found org.scala-lang#scala-reflect;2.11.0 in central
found org.slf4j#slf4j-api;1.7.7 in spark-list
:: resolution report :: resolve 191ms :: artifacts dl 5ms
:: modules in use:
com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 from central in 
[default]
com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2 from central 
in [default]
graphframes#graphframes;0.5.0-spark2.1-s_2.11 from spark-list in 
[default]
org.scala-lang#scala-reflect;2.11.0 from central in [default]
org.slf4j#slf4j-api;1.7.7 from spark-list in [default]
-
|  |modules||   artifacts   |
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|
-
|  default |   5   |   0   |   0   |   0   ||   5   |   0   |
-
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 5 already retrieved (0kB/7ms)
Main class:
com.my_user.MainClassWatch
Arguments:
-watchId
3199
-appName
orange_watch
-directory
/u01/watch/stream/
System properties:
(spark.executor.memory,6g)
(spark.driver.memory,4g)
(SPARK_SUBMIT,true)
(spark.jars.packages,graphframes:graphframes:0.5.0-spark2.1-s_2.11)
(spark.app.name,orange_watch)

Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread Qiao, Richard
In the 2nd case, is there any producer’s error thrown in executor’s log?

Best Regards
Richard


From: kant kodali <kanth...@gmail.com>
Date: Tuesday, December 5, 2017 at 4:38 PM
To: "Qiao, Richard" <richard.q...@capitalone.com>
Cc: "user @spark" <user@spark.apache.org>
Subject: Re: Do I need to do .collect inside forEachRDD

Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard 
<richard.q...@capitalone.com<mailto:richard.q...@capitalone.com>> wrote:
Where do you check the output result for both case?

Sent from my iPhone

> On Dec 5, 2017, at 15:36, kant kodali 
> <kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
>
> Hi All,
>
> I have a simple stateless transformation using Dstreams (stuck with the old 
> API for one of the Application). The pseudo code is rough like this
>
> dstream.map().reduce().forEachRdd(rdd -> {
>  rdd.collect(),forEach(); // Is this necessary ? Does execute fine but a 
> bit slow
> })
>
> I understand collect collects the results back to the driver but is that 
> necessary? can I just do something like below? I believe I tried both and 
> somehow the below code didn't output any results (It can be issues with my 
> env. I am not entirely sure) but I just would like some clarification on 
> .collect() since it seems to slow things down for me.
>
> dstream.map().reduce().forEachRdd(rdd -> {
>  rdd.forEach(() -> {} ); //
> })
>
> Thanks!
>
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread Qiao, Richard
Where do you check the output result for both case?

Sent from my iPhone

> On Dec 5, 2017, at 15:36, kant kodali  wrote:
> 
> Hi All,
> 
> I have a simple stateless transformation using Dstreams (stuck with the old 
> API for one of the Application). The pseudo code is rough like this
> 
> dstream.map().reduce().forEachRdd(rdd -> {
>  rdd.collect(),forEach(); // Is this necessary ? Does execute fine but a 
> bit slow
> })
> 
> I understand collect collects the results back to the driver but is that 
> necessary? can I just do something like below? I believe I tried both and 
> somehow the below code didn't output any results (It can be issues with my 
> env. I am not entirely sure) but I just would like some clarification on 
> .collect() since it seems to slow things down for me.
> 
> dstream.map().reduce().forEachRdd(rdd -> {
>  rdd.forEach(() -> {} ); // 
> })
> 
> Thanks!
> 
> 


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Access to Applications metrics

2017-12-04 Thread Qiao, Richard
It works to collect Job level, through Jolokia java agent.

Best Regards
Richard


From: Nick Dimiduk 
Date: Monday, December 4, 2017 at 6:53 PM
To: "user@spark.apache.org" 
Subject: Re: Access to Applications metrics

Bump.

On Wed, Nov 15, 2017 at 2:28 PM, Nick Dimiduk 
> wrote:
Hello,

I'm wondering if it's possible to get access to the detailed job/stage/task 
level metrics via the metrics system (JMX, Graphite, ). I've enabled the 
wildcard sink and I do not see them. It seems these values are only available 
over http/json and to SparkListener instances, is this the case? Has anyone 
worked on a SparkListener that would bridge data from one to the other?

Thanks,
Nick



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Add snappy support for spark in Windows

2017-12-04 Thread Qiao, Richard
Junjeng, it worth a try to start your spark local with hadoop.dll/winutils.exe 
etc hadoop windows support package in HADOOP_HOME, if you didn’t do that yet.

Best Regards
Richard


From: Junfeng Chen <darou...@gmail.com>
Date: Monday, December 4, 2017 at 3:53 AM
To: "Qiao, Richard" <richard.q...@capitalone.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: Add snappy support for spark in Windows

But I am working on my local development machine, so it should have no relative 
to workers/executers.

I find some documents about enable snappy on hadoop. If I want to use snappy 
with spark, do I need to config spark as hadoop or have some easy way to access 
it?


Regard,
Junfeng Chen

On Mon, Dec 4, 2017 at 4:12 PM, Qiao, Richard 
<richard.q...@capitalone.com<mailto:richard.q...@capitalone.com>> wrote:
It seems a common mistake that the path is not accessible by workers/executors.

Best regards
Richard

Sent from my iPhone

On Dec 3, 2017, at 22:32, Junfeng Chen 
<darou...@gmail.com<mailto:darou...@gmail.com>> wrote:

I am working on importing snappy compressed json file into spark rdd or 
dataset. However I meet this error: java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z

I have set the following configuration:

SparkConf conf = new SparkConf()

.setAppName("normal spark")

.setMaster("local")

.set("spark.io.compression.codec", 
"org.apache.spark.io<http://org.apache.spark.io>.SnappyCompressionCodec")


.set("spark.driver.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")


.set("spark.driver.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")


.set("spark.executor.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")


.set("spark.executor.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")

;

Where D:\Downloads\spark-2.2.0-bin-hadoop2.7 is my spark unpacked path, and I 
can find the snappy jar file snappy-0.2.jar and snappy-java-1.1.2.6.jar in

D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\

However nothing works and even the error message not change.

How can I fix it?



ref of stackoverflow: 
https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows
 
<https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows>


Regard,
Junfeng Chen



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Add snappy support for spark in Windows

2017-12-04 Thread Qiao, Richard
It seems a common mistake that the path is not accessible by workers/executors.

Best regards
Richard

Sent from my iPhone

On Dec 3, 2017, at 22:32, Junfeng Chen 
> wrote:


I am working on importing snappy compressed json file into spark rdd or 
dataset. However I meet this error: java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z

I have set the following configuration:

SparkConf conf = new SparkConf()
.setAppName("normal spark")
.setMaster("local")
.set("spark.io.compression.codec", 
"org.apache.spark.io.SnappyCompressionCodec")

.set("spark.driver.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")

.set("spark.driver.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")

.set("spark.executor.extraLibraryPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")

.set("spark.executor.extraClassPath","D:\\Downloads\\spark-2.2.0-bin-hadoop2.7\\spark-2.2.0-bin-hadoop2.7\\jars")
;

Where D:\Downloads\spark-2.2.0-bin-hadoop2.7 is my spark unpacked path, and I 
can find the snappy jar file snappy-0.2.jar and snappy-java-1.1.2.6.jar in

D:\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\jars\

However nothing works and even the error message not change.

How can I fix it?


ref of stackoverflow: 
https://stackoverflow.com/questions/47626012/config-snappy-support-for-spark-in-windows
 



Regard,
Junfeng Chen


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Dynamic Resource allocation in Spark Streaming

2017-12-03 Thread Qiao, Richard
Sourav:
I’m using spark streaming 2.1.0 and can confirm 
spark.dynamicAllocation.enabled is enough.

Best Regards
Richard

From: Sourav Mazumder 
Date: Sunday, December 3, 2017 at 12:31 PM
To: user 
Subject: Dynamic Resource allocation in Spark Streaming

Hi,
I see the following jira is resolved in Spark 2.0 
https://issues.apache.org/jira/browse/SPARK-12133 which is supposed to support 
Dynamic Resource Allocation in Spark Streaming.
I also see the JiRA https://issues.apache.org/jira/browse/SPARK-22008 which is 
about fixing numer of executor related issue in dynamic allocation in Spark 
Streaming

But when I check http://spark.apache.org/docs/2.1.1/configuration.html (or teh 
same for 2.2) I don;t see the configuration parameter 
park.streaming.dynamicAllocation.enabled.
Is this feature there at all in Spark 2.0 ? Or setting 
spark.dynamicAllocation.enabled is good enough ?
Regards,
Sourav


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: [Spark streaming] No assigned partition error during seek

2017-12-01 Thread Qiao, Richard
In your case, it looks it’s trying to make 2 versions Kafka existed in the same 
JVM at runtime. There is version conflict.

About “I dont find the spark async commit  useful for our needs”, do you mean 
to say the code like below?
kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)


Best Regards
Richard


From: venkat 
Date: Thursday, November 30, 2017 at 8:16 PM
To: Cody Koeninger 
Cc: "user@spark.apache.org" 
Subject: Re: [Spark streaming] No assigned partition error during seek

I notice that 'Do not manually add dependencies on org.apache.kafka artifacts 
(e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has the 
appropriate transitive dependencies already, and different versions may be 
incompatible in hard to diagnose way' after your query.
Does this imply that we should not be adding kafka clients in our jars?.
Thanks
Venkat

On Fri, 1 Dec 2017 at 06:45 venkat 
> wrote:
Yes I use latest Kafka clients 0.11 to determine beginning offsets without seek 
and also I use Kafka offsets commits externally.
I dont find the spark async commit  useful for our needs.
Thanks
Venkat

On Fri, 1 Dec 2017 at 02:39 Cody Koeninger 
> wrote:
You mentioned 0.11 version; the latest version of org.apache.kafka
kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
has an appropriate dependency.

Are you manually depending on a different version of the kafka-clients artifact?

On Fri, Nov 24, 2017 at 7:39 PM, venks61176 
> wrote:
> Version: 2.2 with Kafka010
>
> Hi,
>
> We are running spark streaming on AWS and trying to process incoming
> messages on Kafka topics. All was well.
> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
> Kafka 0.11 version of server.
>
> With this new version of software we are facing issues with regard to 'No
> assignment to partition for a topic and it happens intermittently'. I
> construct four DStreams with different group.ids as suggested.
>
> The main source of code thats causing the issue is this one
>
> if (!toSeek.isEmpty) {
>   // work around KAFKA-3370 when reset is none
>   // poll will throw if no position, i.e. auto offset reset none and no
> explicit position
>   // but cant seek to a position before poll, because poll is what gets
> subscription partitions
>   // So, poll, suppress the first exception, then seek
>   val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>   val shouldSuppress = aor != null &&
> aor.asInstanceOf[String].toUpperCase == "NONE"
>   try {
> consumer.poll(0)
>   } catch {
> case x: NoOffsetForPartitionException if shouldSuppress =>
>   logWarning("Catching NoOffsetForPartitionException since " +
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> KAFKA-3370")
>   }
>   toSeek.asScala.foreach { case (topicPartition, offset) =>
>   *consumer.seek(topicPartition, offset)*
>   }
> }
>
> At the start of the job, I also ensure we are supplying all required offsets
> correctly
>
> private Map getCommittedOffsets(String topic) {
> Map offsets = new HashMap<>();
> List topicPartitions =
> consumer.partitionsFor(topic).stream().map(partitionInfo ->
> new TopicPartition(partitionInfo.topic(),
> partitionInfo.partition()))
> .collect(Collectors.toList());
> Map earliestOffsets =
> consumer.beginningOffsets(topicPartitions);
> // pick committed offsets
> for (TopicPartition topicAndPartition : topicPartitions) {
>   final OffsetAndMetadata committed =
> consumer.committed(topicAndPartition);
>   Long earliestOffset = earliestOffsets.get(topicAndPartition);
>   if (committed != null && committed.offset() > earliestOffset) {
> logger
> .warn(
> "Committed offset found for: {} offset:{} -> Hence adding
> committed offset",
> topicAndPartition, committed.offset());
> offsets.put(topicAndPartition, committed.offset());
>   } else {
> logger
> .warn(
> "New partition/stale offset found for: {} offset:{} -> Hence
> adding earliest offset",
> topicAndPartition, earliestOffset);
> offsets.put(topicAndPartition, earliestOffset);
>   }
> }
> return offsets;
>   }
>
> The actual stack trace:
>
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition genericEvents-1
> 2017-11-23 10:35:24,677 -at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> 2017-11-23 10:35:24,677 -at
>