Re: Getting List of Executor Id's

2019-05-13 Thread Afshartous, Nick


Answering my own question.  Looks like this can be done by implementing 
SparkListener with method

  def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit

as the SparkListenerExecutorAdded object has the info.  
--
Nick




Am using Spark 2.3 and looking for an API in Java to fetch the list of 
executors.  Need host and Id info for the executors.

Thanks for any pointers,
--
Nick

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Getting List of Executor Id's

2019-05-13 Thread Afshartous, Nick


Hi,

Am using Spark 2.3 and looking for an API in Java to fetch the list of 
executors.  Need host and Id info for the executors.  

Thanks for any pointers,
--
Nick

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ Spark Streaming & Kafka 0.10 ] Possible bug

2017-03-22 Thread Afshartous, Nick

Hi,

I think I'm seeing a bug in the context of upgrading to using the Kafka 0.10 
streaming API.  Code fragments follow.
--
   Nick

   JavaInputDStream> rawStream = 
getDirectKafkaStream();

   JavaDStream> messagesTuple = rawStream.map(
new Function, Tuple2>() {
  @Override
  public Tuple2 
call(ConsumerRecord record) {
  final String hyphen = "-";
  final String topicPartition = record.partition() 
+ hyphen + record.offset();

  return new Tuple2<>(topicPartition, 
record.value());
  }
  }
);

messagesTuple.foreachRDD(new VoidFunction>>() {
 @Override
 public void call(JavaRDD> rdd) throws Exception {
 List> list = 
rdd.take(10);

 for (Tuple2 pair : 
list) {
 log.info("messages tuple key: " + 
pair._1() + " : " + pair._2());
 }
 }
 }
);


The above foreachRDD logs output correctly.

17/03/22 15:57:01 INFO StreamingKafkaConsumerDriver: messages tuple key: 
-13-231599504 : �2017-03-22 15:54:05.568628$�g� 
ClientDev_Perf0585965449a1d3524b9e68396X@6eda8a884567b3442be68282b35aeeafMaterialReviewSinglePlayer`?��@�Vwin��@1.0.1703.0Unlabeled
 Stable�8���Not ApplicableNot ApplicableNot 
ApplicabledayMR_Day01Empty�<<>, String, String> {
...

@Override
public Iterator> call(Iterator> messages)
throws Exception {

while (messages.hasNext()) {
Tuple2 record = messages.next();
String topicPartitionOffset = record._1();
byte[] val = record._2();  // Line 113 <<< 
ClassCastException

   ...



[Spark Kafka] API Doc pages for Kafka 0.10 not current

2017-02-27 Thread Afshartous, Nick

Hello,


Looks like the API docs linked from the Spark Kafka 0.10 Integration page are 
not current.


For instance, on the page


   https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html


the code examples show the new API (i.e. class ConsumerStrategies).  However, 
following the links


API Docs --> (Scala | Java)


leads to API pages that do not have class ConsumerStrategies) .  The API doc 
package names  also have streaming.kafka as opposed to streaming.kafka10.


--

Nick


Error making REST call from streaming app

2016-05-23 Thread Afshartous, Nick

Hi,


We got the following exception trying to initiate a REST call from the Spark 
app.


This is running Spark 1.5.2 in AWS / Yarn.  Its only happened one time during 
the course of a streaming app

that has been running for months.


Just curious if anyone could shed some more light on root cause.


Thanks,

--

Nick


> User class threw exception: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 323 in stage 15154.0 failed 4 times, most recent 
> failure: Lost task 323.3 in stage 15154.0 (TID 2010826, 
> ip-10-247-128-182.ec2.internal): 
> com.sun.jersey.spi.service.ServiceConfigurationError: 
> com.sun.jersey.spi.inject.InjectableProvider: : 
> java.io.FileNotFoundException: 
> /mnt/yarn/usercache/hadoop/appcache/application_1452625196513_0026/container_1452625196513_0026_02_03/__app__.jar
>  (No such file or directory)
> at com.sun.jersey.spi.service.ServiceFinder.fail(ServiceFinder.java:610)
> at com.sun.jersey.spi.service.ServiceFinder.parse(ServiceFinder.java:682)
> at com.sun.jersey.spi.service.ServiceFinder.access$500(ServiceFinder.java:159)
> at 
> com.sun.jersey.spi.service.ServiceFinder$AbstractLazyIterator.hasNext(ServiceFinder.java:739)
> at 
> com.sun.jersey.spi.service.ServiceFinder.toClassArray(ServiceFinder.java:595)
> at 
> com.sun.jersey.core.spi.component.ProviderServices.getServiceClasses(ProviderServices.java:318)
> at 
> com.sun.jersey.core.spi.component.ProviderServices.getProviderAndServiceClasses(ProviderServices.java:297)
> at 
> com.sun.jersey.core.spi.component.ProviderServices.getProvidersAndServices(ProviderServices.java:204)
> at 
> com.sun.jersey.core.spi.factory.InjectableProviderFactory.configure(InjectableProviderFactory.java:106)
> at com.sun.jersey.api.client.Client.init(Client.java:263)
> at com.sun.jersey.api.client.Client.access$000(Client.java:118)
> at com.sun.jersey.api.client.Client$1.f(Client.java:191)
> at com.sun.jersey.api.client.Client$1.f(Client.java:187)
> at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
> at com.sun.jersey.api.client.Client.(Client.java:187)
> at com.sun.jersey.api.client.Client.(Client.java:159)
> at com.sun.jersey.api.client.Client.create(Client.java:669)
> at 
> com.wb.analytics.schemaservice.fingerprint.FingerPrintRestClient.getSchema(FingerPrintRestClient.java:48)
> at 
> com.wb.analytics.schemaservice.fingerprint.FingerPrintService.getSchemaFromService(FingerPrintService.java:80)




Re: Writing output of key-value Pair RDD

2016-05-05 Thread Afshartous, Nick

Answering my own question.


I filtered out the keys from the output file by overriding


  MultipleOutputFormat.generateActualKey


to return the empty string.

--

Nick


class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<String, 
String> {

@Override
protected String generateFileNameForKeyValue(String key, String value, 
String name) {
return key;
}

@Override
protected String generateActualKey(String key, String value) {
return "";
}

}

____
From: Afshartous, Nick <nafshart...@turbine.com>
Sent: Thursday, May 5, 2016 3:35:17 PM
To: Nicholas Chammas; user@spark.apache.org
Subject: Re: Writing output of key-value Pair RDD



Thanks, I got the example below working.  Though it writes both the keys and 
values to the output file.

Is there any way to write just the values ?

--

Nick


String[] strings = { "Abcd", "Azlksd", "whhd", "wasc", "aDxa" };

sc.parallelize(Arrays.asList(strings))

.mapToPair(pairFunction)
.saveAsHadoopFile("s3://...", String.class, String.class, 
RDDMultipleTextOutputFormat.class);



From: Nicholas Chammas <nicholas.cham...@gmail.com>
Sent: Wednesday, May 4, 2016 4:21:12 PM
To: Afshartous, Nick; user@spark.apache.org
Subject: Re: Writing output of key-value Pair RDD

You're looking for this discussion: http://stackoverflow.com/q/23995040/877069

Also, a simpler alternative with DataFrames: 
https://github.com/apache/spark/pull/8375#issuecomment-202458325

On Wed, May 4, 2016 at 4:09 PM Afshartous, Nick 
<nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote:

Hi,


Is there any way to write out to S3 the values of a f key-value Pair RDD ?


I'd like each value of a pair to be written to its own file where the file name 
corresponds to the key name.


Thanks,

--

Nick


Re: Writing output of key-value Pair RDD

2016-05-05 Thread Afshartous, Nick

Thanks, I got the example below working.  Though it writes both the keys and 
values to the output file.

Is there any way to write just the values ?

--

Nick


String[] strings = { "Abcd", "Azlksd", "whhd", "wasc", "aDxa" };

sc.parallelize(Arrays.asList(strings))

.mapToPair(pairFunction)
.saveAsHadoopFile("s3://...", String.class, String.class, 
RDDMultipleTextOutputFormat.class);



From: Nicholas Chammas <nicholas.cham...@gmail.com>
Sent: Wednesday, May 4, 2016 4:21:12 PM
To: Afshartous, Nick; user@spark.apache.org
Subject: Re: Writing output of key-value Pair RDD

You're looking for this discussion: http://stackoverflow.com/q/23995040/877069

Also, a simpler alternative with DataFrames: 
https://github.com/apache/spark/pull/8375#issuecomment-202458325

On Wed, May 4, 2016 at 4:09 PM Afshartous, Nick 
<nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote:

Hi,


Is there any way to write out to S3 the values of a f key-value Pair RDD ?


I'd like each value of a pair to be written to its own file where the file name 
corresponds to the key name.


Thanks,

--

Nick


Writing output of key-value Pair RDD

2016-05-04 Thread Afshartous, Nick
Hi,


Is there any way to write out to S3 the values of a f key-value Pair RDD ?


I'd like each value of a pair to be written to its own file where the file name 
corresponds to the key name.


Thanks,

--

Nick


Reading Back a Cached RDD

2016-03-24 Thread Afshartous, Nick

Hi,


After calling RDD.persist(), is then possible to come back later and access the 
persisted RDD.

Let's say for instance coming back and starting a new Spark shell session.  How 
would one access the persisted RDD in the new shell session ?


Thanks,

--

   Nick


Using Spark SQL / Hive on AWS EMR

2016-03-03 Thread Afshartous, Nick

Hi,


On AWS EMR 4.2 / Spark 1.5.2, I tried the example here


  https://spark.apache.org/docs/1.5.0/sql-programming-guide.html#hive-tables


to load data from a file into a Hive table.


  scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

  scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value 
STRING)")

scala> sqlContext.sql("LOAD DATA LOCAL INPATH 'data.txt' INTO TABLE src")



The resultant error is below.  Just wondering if I'm missing any steps in 
getting Hive setup on the AWS EMR Spark setup.


Thanks,

--

Nick



16/03/02 14:14:04 INFO Hive: Renaming src: file:/home/hadoop/data.txt, dest: 
hdfs://ip-10-247-128-59.ec2.internal:8020/user/hive/warehouse/src/data_copy_2.txt,
 Status:true
16/03/02 14:14:04 WARN RetryingMetaStoreClient: MetaStoreClient lost 
connection. Attempting to reconnect.
org.apache.thrift.TApplicationException: Invalid method name: 
'alter_table_with_cascade'


Spark Streaming : requirement failed: numRecords must not be negative

2016-01-22 Thread Afshartous, Nick

Hello,


We have a streaming job that consistently fails with the trace below.  This is 
on an AWS EMR 4.2/Spark 1.5.2 cluster.


This ticket looks related


SPARK-8112 Received block event count through the StreamingListener can be 
negative


although it appears to have been fixed in 1.5.


Thanks for any suggestions,


--

Nick



Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: numRecords must not be negative
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
at 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
at scala.util.Try$.apply(Try.scala:161)
at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



Re: Spark Streaming : requirement failed: numRecords must not be negative

2016-01-22 Thread Afshartous, Nick

This seems to be a problem with Kafka brokers being in a bad state.  We're 
restarting Kafka to resolve.

--

Nick



From: Ted Yu <yuzhih...@gmail.com>
Sent: Friday, January 22, 2016 10:38 AM
To: Afshartous, Nick
Cc: user@spark.apache.org
Subject: Re: Spark Streaming : requirement failed: numRecords must not be 
negative

Is it possible to reproduce the condition below with test code ?

Thanks

On Fri, Jan 22, 2016 at 7:31 AM, Afshartous, Nick 
<nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote:


Hello,


We have a streaming job that consistently fails with the trace below.  This is 
on an AWS EMR 4.2/Spark 1.5.2 cluster.


This ticket looks related


SPARK-8112 Received block event count through the StreamingListener can be 
negative


although it appears to have been fixed in 1.5.


Thanks for any suggestions,


--

Nick



Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: numRecords must not be negative
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
at 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
at scala.util.Try$.apply(Try.scala:161)
at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
at 
org.apache.spark.streaming.scheduler.JobGenerator.org<http://org.apache.spark.streaming.scheduler.JobGenerator.org>$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)




Client versus cluster mode

2016-01-21 Thread Afshartous, Nick

Hi,


In an AWS EMR/Spark 1.5 cluster we're launching a streaming job from the driver 
node.  Would it make any sense in this case to use cluster mode ?  More 
specifically would there be any benefit that YARN would provide when using 
cluster but not client mode ?


Thanks,

--

Nick


Re: Consuming commands from a queue

2016-01-16 Thread Afshartous, Nick

Thanks Cody.


One reason I was thinking of using Akka is that some of the copies take much 
longer than others (or get stuck).  We've seen this with our current streaming 
job.  This can cause the entire streaming micro-batch to take longer.


If we had a set of Akka actors than each copy would be isolated.

--

Nick



From: Cody Koeninger <c...@koeninger.org>
Sent: Friday, January 15, 2016 11:46 PM
To: Afshartous, Nick
Cc: user@spark.apache.org
Subject: Re: Consuming commands from a queue

Reading commands from kafka and triggering a redshift copy is sufficiently 
simple it could just be a bash script.  But if you've already got a spark 
streaming job set up, may as well use it for consistency's sake.  There's 
definitely no need to mess around with akka.

On Fri, Jan 15, 2016 at 6:25 PM, Afshartous, Nick 
<nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote:


Hi,


We have a streaming job that consumes from Kafka and outputs to S3.  We're 
going to have the job also send commands (to copy from S3 to Redshift) into a 
different Kafka topic.


What would be the best framework for consuming and processing the copy commands 
?  We're considering creating a second streaming job or using Akka.


Thanks for any suggestions,

--

Nick



Consuming commands from a queue

2016-01-15 Thread Afshartous, Nick

Hi,


We have a streaming job that consumes from Kafka and outputs to S3.  We're 
going to have the job also send commands (to copy from S3 to Redshift) into a 
different Kafka topic.


What would be the best framework for consuming and processing the copy commands 
?  We're considering creating a second streaming job or using Akka.


Thanks for any suggestions,

--

Nick


Configuring log4j

2015-12-18 Thread Afshartous, Nick

Hi,


Am trying to configure log4j on an AWS EMR 4.2 Spark cluster for a streaming 
job set in client mode.


I changed


   /etc/spark/conf/log4j.properties


to use a FileAppender.  However the INFO logging still goes to console.


Thanks for any suggestions,

--

Nick


>From the console:

Adding default property: 
spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties
 -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 
-XX:MaxHeapFreeRatio=70 -XX:+CM\
SClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'



Re: Configuring log4j

2015-12-18 Thread Afshartous, Nick

Found the issue, a conflict between setting Java options in both 
spark-defaults.conf and in the spark-submit.

--

Nick



From: Afshartous, Nick <nafshart...@turbine.com>
Sent: Friday, December 18, 2015 11:46 AM
To: user@spark.apache.org
Subject: Configuring log4j



Hi,


Am trying to configure log4j on an AWS EMR 4.2 Spark cluster for a streaming 
job set in client mode.


I changed


   /etc/spark/conf/log4j.properties


to use a FileAppender.  However the INFO logging still goes to console.


Thanks for any suggestions,

--

Nick


>From the console:

Adding default property: 
spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties
 -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 
-XX:MaxHeapFreeRatio=70 -XX:+CM\
SClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'



Spark Submit - java.lang.IllegalArgumentException: requirement failed

2015-12-11 Thread Afshartous, Nick

Hi,


I'm trying to run a streaming job on a single node EMR 4.1/Spark 1.5 cluster.  
Its throwing an IllegalArgumentException right away on the submit.

Attaching full output from console.


Thanks for any insights.

--

Nick



15/12/11 16:44:43 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
15/12/11 16:44:43 INFO client.RMProxy: Connecting to ResourceManager at 
ip-10-247-129-50.ec2.internal/10.247.129.50:8032
15/12/11 16:44:43 INFO yarn.Client: Requesting a new application from cluster 
with 1 NodeManagers
15/12/11 16:44:43 INFO yarn.Client: Verifying our application has not requested 
more than the maximum memory capability of the cluster (54272 MB per container)
15/12/11 16:44:43 INFO yarn.Client: Will allocate AM container, with 11264 MB 
memory including 1024 MB overhead
15/12/11 16:44:43 INFO yarn.Client: Setting up container launch context for our 
AM
15/12/11 16:44:43 INFO yarn.Client: Setting up the launch environment for our 
AM container
15/12/11 16:44:43 INFO yarn.Client: Preparing resources for our AM container
15/12/11 16:44:44 INFO yarn.Client: Uploading resource 
file:/usr/lib/spark/lib/spark-assembly-1.5.0-hadoop2.6.0-amzn-1.jar -> 
hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoop/.sparkStaging/application_1447\
442727308_0126/spark-assembly-1.5.0-hadoop2.6.0-amzn-1.jar
15/12/11 16:44:44 INFO metrics.MetricsSaver: MetricsConfigRecord 
disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 
disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500\
 lastModified: 1447442734295
15/12/11 16:44:44 INFO metrics.MetricsSaver: Created MetricsSaver 
j-2H3BTA60FGUYO:i-f7812947:SparkSubmit:15603 period:60 
/mnt/var/em/raw/i-f7812947_20151211_SparkSubmit_15603_raw.bin
15/12/11 16:44:45 INFO metrics.MetricsSaver: 1 aggregated HDFSWriteDelay 1276 
raw values into 1 aggregated values, total 1
15/12/11 16:44:45 INFO yarn.Client: Uploading resource 
file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/workflow/lib/spark-kafka-services-1.0.jar
 -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoo\
p/.sparkStaging/application_1447442727308_0126/spark-kafka-services-1.0.jar
15/12/11 16:44:45 INFO yarn.Client: Uploading resource 
file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/conf/AwsCredentials.properties
 -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoop/.sparkSta\
ging/application_1447442727308_0126/AwsCredentials.properties
15/12/11 16:44:45 WARN yarn.Client: Resource 
file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/conf/AwsCredentials.properties
 added multiple times to distributed cache.
15/12/11 16:44:45 INFO yarn.Client: Deleting staging directory 
.sparkStaging/application_1447442727308_0126
Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed
at scala.Predef$.require(Predef.scala:221)
at 
org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6$$anonfun$apply$2.apply(Client.scala:392)
at 
org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6$$anonfun$apply$2.apply(Client.scala:390)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6.apply(Client.scala:390)
at 
org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6.apply(Client.scala:388)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:388)
at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:629)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:119)
at org.apache.spark.deploy.yarn.Client.run(Client.scala:907)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:966)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)


adjust ~/spark-pipeline-framework-1.1.6-SNAPSHOT > 
adjust ~/spark-pipeline-framework-1.1.6-SNAPSHOT > ./bin/run-event-streaming.sh 
conf/dev/nick-malcolm-events.properties  > console.txt
Using properties file: /usr/lib/spark/conf/spark-defaults.conf
Adding default property: spark.executor.extraJavaOptions=-verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
-XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
Adding default property: 
spark.history.fs.logDirectory=hdfs:///var/log/spark/apps
Adding default property: spark.eventLog.enabled=true
Adding default property: spark.shuffle.service.enabled=true
Adding default property: 
spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
Adding default property: 
spark.yarn.historyServer.address=ip-10-247-129-50.ec2.internal:18080
Adding default 

Re: Spark Submit - java.lang.IllegalArgumentException: requirement failed

2015-12-11 Thread Afshartous, Nick

Thanks JB.

I'm submitting from the AWS Spark master node, the spark-default.conf is 
pre-deployed by Amazon (attached) and there is no setting
for spark.yarn.keytab.  Is there any doc for setting this up if required in 
this scenario ?

Also, I if deploy-mode is switched from cluster to client on spark-submit then 
the error no longer appears.  Just wondering if there's any difference to 
using client versus cluster mode if the submit is being done on the master 
node. 

Thanks for any suggestions,
--
Nick


From: Jean-Baptiste Onofré <j...@nanthrax.net>
Sent: Friday, December 11, 2015 1:01 PM
To: user@spark.apache.org
Subject: Re: Spark Submit - java.lang.IllegalArgumentException: requirement 
failed

Hi Nick,

the localizedPath has to be not null, that's why the requirement fails.

In the SparkConf used by the spark-submit (default in
conf/spark-default.conf), do you have all properties defined, especially
spark.yarn.keytab ?

Thanks,
Regards
JB

On 12/11/2015 05:49 PM, Afshartous, Nick wrote:
>
> Hi,
>
>
> I'm trying to run a streaming job on a single node EMR 4.1/Spark 1.5
> cluster.  Its throwing an IllegalArgumentException right away on the submit.
>
> Attaching full output from console.
>
>
> Thanks for any insights.
>
> --
>
>  Nick
>
>
>
> 15/12/11 16:44:43 WARN util.NativeCodeLoader: Unable to load
> native-hadoop library for your platform... using builtin-java classes
> where applicable
> 15/12/11 16:44:43 INFO client.RMProxy: Connecting to ResourceManager at
> ip-10-247-129-50.ec2.internal/10.247.129.50:8032
> 15/12/11 16:44:43 INFO yarn.Client: Requesting a new application from
> cluster with 1 NodeManagers
> 15/12/11 16:44:43 INFO yarn.Client: Verifying our application has not
> requested more than the maximum memory capability of the cluster (54272
> MB per container)
> 15/12/11 16:44:43 INFO yarn.Client: Will allocate AM container, with
> 11264 MB memory including 1024 MB overhead
> 15/12/11 16:44:43 INFO yarn.Client: Setting up container launch context
> for our AM
> 15/12/11 16:44:43 INFO yarn.Client: Setting up the launch environment
> for our AM container
> 15/12/11 16:44:43 INFO yarn.Client: Preparing resources for our AM container
> 15/12/11 16:44:44 INFO yarn.Client: Uploading resource
> file:/usr/lib/spark/lib/spark-assembly-1.5.0-hadoop2.6.0-amzn-1.jar ->
> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoop/.sparkStaging/application_1447\
> 442727308_0126/spark-assembly-1.5.0-hadoop2.6.0-amzn-1.jar
> 15/12/11 16:44:44 INFO metrics.MetricsSaver: MetricsConfigRecord
> disabledInCluster: false instanceEngineCycleSec: 60
> clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072
> maxInstanceCount: 500\
>   lastModified: 1447442734295
> 15/12/11 16:44:44 INFO metrics.MetricsSaver: Created MetricsSaver
> j-2H3BTA60FGUYO:i-f7812947:SparkSubmit:15603 period:60
> /mnt/var/em/raw/i-f7812947_20151211_SparkSubmit_15603_raw.bin
> 15/12/11 16:44:45 INFO metrics.MetricsSaver: 1 aggregated HDFSWriteDelay
> 1276 raw values into 1 aggregated values, total 1
> 15/12/11 16:44:45 INFO yarn.Client: Uploading resource
> file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/workflow/lib/spark-kafka-services-1.0.jar
> -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoo\
> p/.sparkStaging/application_1447442727308_0126/spark-kafka-services-1.0.jar
> 15/12/11 16:44:45 INFO yarn.Client: Uploading resource
> file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/conf/AwsCredentials.properties
> -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoop/.sparkSta\
> ging/application_1447442727308_0126/AwsCredentials.properties
> 15/12/11 16:44:45 WARN yarn.Client: Resource
> file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/conf/AwsCredentials.properties
> added multiple times to distributed cache.
> 15/12/11 16:44:45 INFO yarn.Client: Deleting staging directory
> .sparkStaging/application_1447442727308_0126
> Exception in thread "main" java.lang.IllegalArgumentException:
> requirement failed
>  at scala.Predef$.require(Predef.scala:221)
>  at
> org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6$$anonfun$apply$2.apply(Client.scala:392)
>  at
> org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6$$anonfun$apply$2.apply(Client.scala:390)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>  at
> org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6.apply(Client.scala:390)
>  at
> org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6.apply(Client.scala

Configuring Log4J (Spark 1.5 on EMR 4.1)

2015-11-19 Thread Afshartous, Nick

Hi,

On Spark 1.5 on EMR 4.1 the message below appears in stderr in the Yarn UI.

  ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.

I do see that there is

   /usr/lib/spark/conf/log4j.properties

Can someone please advise on how to setup log4j properly.

Thanks,
--
  Nick

Notice: This communication is for the intended recipient(s) only and may 
contain confidential, proprietary, legally protected or privileged information 
of Turbine, Inc. If you are not the intended recipient(s), please notify the 
sender at once and delete this communication. Unauthorized use of the 
information in this communication is strictly prohibited and may be unlawful. 
For those recipients under contract with Turbine, Inc., the information in this 
communication is subject to the terms and conditions of any applicable 
contracts or agreements.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Configuring Log4J (Spark 1.5 on EMR 4.1)

2015-11-19 Thread Afshartous, Nick

< log4j.properties file only exists on the master and not the slave nodes, so 
you are probably running into 
https://issues.apache.org/jira/browse/SPARK-11105, which has already been fixed 
in the not-yet-released Spark 1.6.0. EMR will upgrade to Spark 1.6.0 once it is 
released.

Thanks for the info, though this is a single-node cluster so that can't be the 
cause of the error (which is in the driver log).
--
  Nick

From: Jonathan Kelly [jonathaka...@gmail.com]
Sent: Thursday, November 19, 2015 6:45 PM
To: Afshartous, Nick
Cc: user@spark.apache.org
Subject: Re: Configuring Log4J (Spark 1.5 on EMR 4.1)

This file only exists on the master and not the slave nodes, so you are 
probably running into https://issues.apache.org/jira/browse/SPARK-11105, which 
has already been fixed in the not-yet-released Spark 1.6.0. EMR will upgrade to 
Spark 1.6.0 once it is released.

~ Jonathan

On Thu, Nov 19, 2015 at 1:30 PM, Afshartous, Nick 
<nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote:

Hi,

On Spark 1.5 on EMR 4.1 the message below appears in stderr in the Yarn UI.

  ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.

I do see that there is

   /usr/lib/spark/conf/log4j.properties

Can someone please advise on how to setup log4j properly.

Thanks,
--
  Nick

Notice: This communication is for the intended recipient(s) only and may 
contain confidential, proprietary, legally protected or privileged information 
of Turbine, Inc. If you are not the intended recipient(s), please notify the 
sender at once and delete this communication. Unauthorized use of the 
information in this communication is strictly prohibited and may be unlawful. 
For those recipients under contract with Turbine, Inc., the information in this 
communication is subject to the terms and conditions of any applicable 
contracts or agreements.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



Notice: This communication is for the intended recipient(s) only and may 
contain confidential, proprietary, legally protected or privileged information 
of Turbine, Inc. If you are not the intended recipient(s), please notify the 
sender at once and delete this communication. Unauthorized use of the 
information in this communication is strictly prohibited and may be unlawful. 
For those recipients under contract with Turbine, Inc., the information in this 
communication is subject to the terms and conditions of any applicable 
contracts or agreements.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark/Kafka Streaming Job Gets Stuck

2015-10-28 Thread Afshartous, Nick

Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)  job and 
seeing a problem.  This is running in AWS/Yarn and the streaming batch interval 
is set to 3 minutes and this is a ten node cluster.

Testing at 30,000 events per second we are seeing the streaming job get stuck 
(stack trace below) for over an hour.

Thanks on any insights or suggestions.
--
  Nick

org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43)
com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125)
com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)

Notice: This communication is for the intended recipient(s) only and may 
contain confidential, proprietary, legally protected or privileged information 
of Turbine, Inc. If you are not the intended recipient(s), please notify the 
sender at once and delete this communication. Unauthorized use of the 
information in this communication is strictly prohibited and may be unlawful. 
For those recipients under contract with Turbine, Inc., the information in this 
communication is subject to the terms and conditions of any applicable 
contracts or agreements.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Using Sqark SQL mapping over an RDD

2015-10-08 Thread Afshartous, Nick

> You can't do nested operations on RDDs or DataFrames (i.e. you can't create a 
> DataFrame from within a map function).  Perhaps if you explain what you are 
> trying to accomplish someone can suggest another way.

The code below what I had in mind.  For each Id, I'd like to run a query using 
the Id in the where clause, and then depending on the result possibly run a 
second query.  Either the result of the first or second query
will be used to construct the output of the map function.

Thanks for any suggestions,
--
  Nick


val result = deviceIds.map(deviceId => {
   val withAnalyticsId = sqlContext.sql(
   "select * from ad_info where deviceId = '%1s' and analyticsId <> 'null' 
order by messageTime asc limit 1" format (deviceId))

   if (withAnalyticsId.count() > 0) {
   withAnalyticsId.take(1)(0)
   }
   else {
   val withoutAnalyticsId = sqlContext.sql("select * from ad_info where 
deviceId = '%1s' order by messageTime desc limit 1" format (deviceId))

   withoutAnalyticsId.take(1)(0)
   }
})





From: Michael Armbrust [mich...@databricks.com]
Sent: Thursday, October 08, 2015 1:16 PM
To: Afshartous, Nick
Cc: user@spark.apache.org
Subject: Re: Using Sqark SQL mapping over an RDD

You can't do nested operations on RDDs or DataFrames (i.e. you can't create a 
DataFrame from within a map function).  Perhaps if you explain what you are 
trying to accomplish someone can suggest another way.

On Thu, Oct 8, 2015 at 10:10 AM, Afshartous, Nick 
<nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote:

Hi,

Am using Spark, 1.5 in latest EMR 4.1.

I have an RDD of String

   scala> deviceIds
  res25: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[18] at map at 
:28

and then when trying to map over the RDD while attempting to run a sql query 
the result is a NullPointerException

  scala> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()

with the stack trace below.  If I run the query as a top level expression the 
count is retuned.  There was additional code within
the anonymous function that's been removed to try and isolate.

Thanks for any insights or advice on how to debug this.
--
  Nick


scala> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()
deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()
15/10/08 16:12:56 INFO SparkContext: Starting job: count at :40
15/10/08 16:12:56 INFO DAGScheduler: Got job 18 (count at :40) with 
200 output partitions
15/10/08 16:12:56 INFO DAGScheduler: Final stage: ResultStage 37(count at 
:40)
15/10/08 16:12:56 INFO DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 36)
15/10/08 16:12:56 INFO DAGScheduler: Missing parents: List()
15/10/08 16:12:56 INFO DAGScheduler: Submitting ResultStage 37 
(MapPartitionsRDD[37] at map at :40), which has no missing parents
15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(17904) called with 
curMem=531894, maxMem=560993402
15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22 stored as values in 
memory (estimated size 17.5 KB, free 534.5 MB)
15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(7143) called with 
curMem=549798, maxMem=560993402
15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22_piece0 stored as bytes 
in memory (estimated size 7.0 KB, free 534.5 MB)
15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 
10.247.0.117:33555<http://10.247.0.117:33555> (size: 7.0 KB, free: 535.0 MB)
15/10/08 16:12:56 INFO SparkContext: Created broadcast 22 from broadcast at 
DAGScheduler.scala:861
15/10/08 16:12:56 INFO DAGScheduler: Submitting 200 missing tasks from 
ResultStage 37 (MapPartitionsRDD[37] at map at :40)
15/10/08 16:12:56 INFO YarnScheduler: Adding task set 37.0 with 200 tasks
15/10/08 16:12:56 INFO TaskSetManager: Starting task 0.0 in stage 37.0 (TID 
649, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
15/10/08 16:12:56 INFO TaskSetManager: Starting task 1.0 in stage 37.0 (TID 
650, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 
ip-10-247-0-117.ec2.internal:46227 (size: 7.0 KB, free: 535.0 MB)
15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 
ip-10-247-0-117.ec2.internal:32938 (size: 7.0 KB, free: 535.0 MB)
15/10/08 16:12:56 INFO TaskSetManager: Starting task 2.0 in stage 37.0 (TID 
651, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
15/10/08 16:12:56 WARN TaskSetManager: Lost task 0.0 in stage 37.0 (TID 649, 
ip-10-247-0-117.ec2.internal): java.lang.NullPointerException
at 
$line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:40)
at 
$line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$i