Re: Getting List of Executor Id's
Answering my own question. Looks like this can be done by implementing SparkListener with method def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit as the SparkListenerExecutorAdded object has the info. -- Nick Am using Spark 2.3 and looking for an API in Java to fetch the list of executors. Need host and Id info for the executors. Thanks for any pointers, -- Nick - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Getting List of Executor Id's
Hi, Am using Spark 2.3 and looking for an API in Java to fetch the list of executors. Need host and Id info for the executors. Thanks for any pointers, -- Nick - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[ Spark Streaming & Kafka 0.10 ] Possible bug
Hi, I think I'm seeing a bug in the context of upgrading to using the Kafka 0.10 streaming API. Code fragments follow. -- Nick JavaInputDStream> rawStream = getDirectKafkaStream(); JavaDStream > messagesTuple = rawStream.map( new Function , Tuple2 >() { @Override public Tuple2 call(ConsumerRecord record) { final String hyphen = "-"; final String topicPartition = record.partition() + hyphen + record.offset(); return new Tuple2<>(topicPartition, record.value()); } } ); messagesTuple.foreachRDD(new VoidFunction >>() { @Override public void call(JavaRDD > rdd) throws Exception { List > list = rdd.take(10); for (Tuple2 pair : list) { log.info("messages tuple key: " + pair._1() + " : " + pair._2()); } } } ); The above foreachRDD logs output correctly. 17/03/22 15:57:01 INFO StreamingKafkaConsumerDriver: messages tuple key: -13-231599504 : �2017-03-22 15:54:05.568628$�g� ClientDev_Perf0585965449a1d3524b9e68396X@6eda8a884567b3442be68282b35aeeafMaterialReviewSinglePlayer`?��@�Vwin��@1.0.1703.0Unlabeled Stable�8���Not ApplicableNot ApplicableNot ApplicabledayMR_Day01Empty�<< >, String, String> { ... @Override public Iterator > call(Iterator > messages) throws Exception { while (messages.hasNext()) { Tuple2 record = messages.next(); String topicPartitionOffset = record._1(); byte[] val = record._2(); // Line 113 <<< ClassCastException ...
[Spark Kafka] API Doc pages for Kafka 0.10 not current
Hello, Looks like the API docs linked from the Spark Kafka 0.10 Integration page are not current. For instance, on the page https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html the code examples show the new API (i.e. class ConsumerStrategies). However, following the links API Docs --> (Scala | Java) leads to API pages that do not have class ConsumerStrategies) . The API doc package names also have streaming.kafka as opposed to streaming.kafka10. -- Nick
Error making REST call from streaming app
Hi, We got the following exception trying to initiate a REST call from the Spark app. This is running Spark 1.5.2 in AWS / Yarn. Its only happened one time during the course of a streaming app that has been running for months. Just curious if anyone could shed some more light on root cause. Thanks, -- Nick > User class threw exception: org.apache.spark.SparkException: Job aborted due > to stage failure: Task 323 in stage 15154.0 failed 4 times, most recent > failure: Lost task 323.3 in stage 15154.0 (TID 2010826, > ip-10-247-128-182.ec2.internal): > com.sun.jersey.spi.service.ServiceConfigurationError: > com.sun.jersey.spi.inject.InjectableProvider: : > java.io.FileNotFoundException: > /mnt/yarn/usercache/hadoop/appcache/application_1452625196513_0026/container_1452625196513_0026_02_03/__app__.jar > (No such file or directory) > at com.sun.jersey.spi.service.ServiceFinder.fail(ServiceFinder.java:610) > at com.sun.jersey.spi.service.ServiceFinder.parse(ServiceFinder.java:682) > at com.sun.jersey.spi.service.ServiceFinder.access$500(ServiceFinder.java:159) > at > com.sun.jersey.spi.service.ServiceFinder$AbstractLazyIterator.hasNext(ServiceFinder.java:739) > at > com.sun.jersey.spi.service.ServiceFinder.toClassArray(ServiceFinder.java:595) > at > com.sun.jersey.core.spi.component.ProviderServices.getServiceClasses(ProviderServices.java:318) > at > com.sun.jersey.core.spi.component.ProviderServices.getProviderAndServiceClasses(ProviderServices.java:297) > at > com.sun.jersey.core.spi.component.ProviderServices.getProvidersAndServices(ProviderServices.java:204) > at > com.sun.jersey.core.spi.factory.InjectableProviderFactory.configure(InjectableProviderFactory.java:106) > at com.sun.jersey.api.client.Client.init(Client.java:263) > at com.sun.jersey.api.client.Client.access$000(Client.java:118) > at com.sun.jersey.api.client.Client$1.f(Client.java:191) > at com.sun.jersey.api.client.Client$1.f(Client.java:187) > at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) > at com.sun.jersey.api.client.Client.(Client.java:187) > at com.sun.jersey.api.client.Client.(Client.java:159) > at com.sun.jersey.api.client.Client.create(Client.java:669) > at > com.wb.analytics.schemaservice.fingerprint.FingerPrintRestClient.getSchema(FingerPrintRestClient.java:48) > at > com.wb.analytics.schemaservice.fingerprint.FingerPrintService.getSchemaFromService(FingerPrintService.java:80)
Re: Writing output of key-value Pair RDD
Answering my own question. I filtered out the keys from the output file by overriding MultipleOutputFormat.generateActualKey to return the empty string. -- Nick class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<String, String> { @Override protected String generateFileNameForKeyValue(String key, String value, String name) { return key; } @Override protected String generateActualKey(String key, String value) { return ""; } } ____ From: Afshartous, Nick <nafshart...@turbine.com> Sent: Thursday, May 5, 2016 3:35:17 PM To: Nicholas Chammas; user@spark.apache.org Subject: Re: Writing output of key-value Pair RDD Thanks, I got the example below working. Though it writes both the keys and values to the output file. Is there any way to write just the values ? -- Nick String[] strings = { "Abcd", "Azlksd", "whhd", "wasc", "aDxa" }; sc.parallelize(Arrays.asList(strings)) .mapToPair(pairFunction) .saveAsHadoopFile("s3://...", String.class, String.class, RDDMultipleTextOutputFormat.class); From: Nicholas Chammas <nicholas.cham...@gmail.com> Sent: Wednesday, May 4, 2016 4:21:12 PM To: Afshartous, Nick; user@spark.apache.org Subject: Re: Writing output of key-value Pair RDD You're looking for this discussion: http://stackoverflow.com/q/23995040/877069 Also, a simpler alternative with DataFrames: https://github.com/apache/spark/pull/8375#issuecomment-202458325 On Wed, May 4, 2016 at 4:09 PM Afshartous, Nick <nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote: Hi, Is there any way to write out to S3 the values of a f key-value Pair RDD ? I'd like each value of a pair to be written to its own file where the file name corresponds to the key name. Thanks, -- Nick
Re: Writing output of key-value Pair RDD
Thanks, I got the example below working. Though it writes both the keys and values to the output file. Is there any way to write just the values ? -- Nick String[] strings = { "Abcd", "Azlksd", "whhd", "wasc", "aDxa" }; sc.parallelize(Arrays.asList(strings)) .mapToPair(pairFunction) .saveAsHadoopFile("s3://...", String.class, String.class, RDDMultipleTextOutputFormat.class); From: Nicholas Chammas <nicholas.cham...@gmail.com> Sent: Wednesday, May 4, 2016 4:21:12 PM To: Afshartous, Nick; user@spark.apache.org Subject: Re: Writing output of key-value Pair RDD You're looking for this discussion: http://stackoverflow.com/q/23995040/877069 Also, a simpler alternative with DataFrames: https://github.com/apache/spark/pull/8375#issuecomment-202458325 On Wed, May 4, 2016 at 4:09 PM Afshartous, Nick <nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote: Hi, Is there any way to write out to S3 the values of a f key-value Pair RDD ? I'd like each value of a pair to be written to its own file where the file name corresponds to the key name. Thanks, -- Nick
Writing output of key-value Pair RDD
Hi, Is there any way to write out to S3 the values of a f key-value Pair RDD ? I'd like each value of a pair to be written to its own file where the file name corresponds to the key name. Thanks, -- Nick
Reading Back a Cached RDD
Hi, After calling RDD.persist(), is then possible to come back later and access the persisted RDD. Let's say for instance coming back and starting a new Spark shell session. How would one access the persisted RDD in the new shell session ? Thanks, -- Nick
Using Spark SQL / Hive on AWS EMR
Hi, On AWS EMR 4.2 / Spark 1.5.2, I tried the example here https://spark.apache.org/docs/1.5.0/sql-programming-guide.html#hive-tables to load data from a file into a Hive table. scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") scala> sqlContext.sql("LOAD DATA LOCAL INPATH 'data.txt' INTO TABLE src") The resultant error is below. Just wondering if I'm missing any steps in getting Hive setup on the AWS EMR Spark setup. Thanks, -- Nick 16/03/02 14:14:04 INFO Hive: Renaming src: file:/home/hadoop/data.txt, dest: hdfs://ip-10-247-128-59.ec2.internal:8020/user/hive/warehouse/src/data_copy_2.txt, Status:true 16/03/02 14:14:04 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect. org.apache.thrift.TApplicationException: Invalid method name: 'alter_table_with_cascade'
Spark Streaming : requirement failed: numRecords must not be negative
Hello, We have a streaming job that consistently fails with the trace below. This is on an AWS EMR 4.2/Spark 1.5.2 cluster. This ticket looks related SPARK-8112 Received block event count through the StreamingListener can be negative although it appears to have been fixed in 1.5. Thanks for any suggestions, -- Nick Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative at scala.Predef$.require(Predef.scala:233) at org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Re: Spark Streaming : requirement failed: numRecords must not be negative
This seems to be a problem with Kafka brokers being in a bad state. We're restarting Kafka to resolve. -- Nick From: Ted Yu <yuzhih...@gmail.com> Sent: Friday, January 22, 2016 10:38 AM To: Afshartous, Nick Cc: user@spark.apache.org Subject: Re: Spark Streaming : requirement failed: numRecords must not be negative Is it possible to reproduce the condition below with test code ? Thanks On Fri, Jan 22, 2016 at 7:31 AM, Afshartous, Nick <nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote: Hello, We have a streaming job that consistently fails with the trace below. This is on an AWS EMR 4.2/Spark 1.5.2 cluster. This ticket looks related SPARK-8112 Received block event count through the StreamingListener can be negative although it appears to have been fixed in 1.5. Thanks for any suggestions, -- Nick Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative at scala.Predef$.require(Predef.scala:233) at org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) at org.apache.spark.streaming.scheduler.JobGenerator.org<http://org.apache.spark.streaming.scheduler.JobGenerator.org>$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Client versus cluster mode
Hi, In an AWS EMR/Spark 1.5 cluster we're launching a streaming job from the driver node. Would it make any sense in this case to use cluster mode ? More specifically would there be any benefit that YARN would provide when using cluster but not client mode ? Thanks, -- Nick
Re: Consuming commands from a queue
Thanks Cody. One reason I was thinking of using Akka is that some of the copies take much longer than others (or get stuck). We've seen this with our current streaming job. This can cause the entire streaming micro-batch to take longer. If we had a set of Akka actors than each copy would be isolated. -- Nick From: Cody Koeninger <c...@koeninger.org> Sent: Friday, January 15, 2016 11:46 PM To: Afshartous, Nick Cc: user@spark.apache.org Subject: Re: Consuming commands from a queue Reading commands from kafka and triggering a redshift copy is sufficiently simple it could just be a bash script. But if you've already got a spark streaming job set up, may as well use it for consistency's sake. There's definitely no need to mess around with akka. On Fri, Jan 15, 2016 at 6:25 PM, Afshartous, Nick <nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote: Hi, We have a streaming job that consumes from Kafka and outputs to S3. We're going to have the job also send commands (to copy from S3 to Redshift) into a different Kafka topic. What would be the best framework for consuming and processing the copy commands ? We're considering creating a second streaming job or using Akka. Thanks for any suggestions, -- Nick
Consuming commands from a queue
Hi, We have a streaming job that consumes from Kafka and outputs to S3. We're going to have the job also send commands (to copy from S3 to Redshift) into a different Kafka topic. What would be the best framework for consuming and processing the copy commands ? We're considering creating a second streaming job or using Akka. Thanks for any suggestions, -- Nick
Configuring log4j
Hi, Am trying to configure log4j on an AWS EMR 4.2 Spark cluster for a streaming job set in client mode. I changed /etc/spark/conf/log4j.properties to use a FileAppender. However the INFO logging still goes to console. Thanks for any suggestions, -- Nick >From the console: Adding default property: spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CM\ SClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
Re: Configuring log4j
Found the issue, a conflict between setting Java options in both spark-defaults.conf and in the spark-submit. -- Nick From: Afshartous, Nick <nafshart...@turbine.com> Sent: Friday, December 18, 2015 11:46 AM To: user@spark.apache.org Subject: Configuring log4j Hi, Am trying to configure log4j on an AWS EMR 4.2 Spark cluster for a streaming job set in client mode. I changed /etc/spark/conf/log4j.properties to use a FileAppender. However the INFO logging still goes to console. Thanks for any suggestions, -- Nick >From the console: Adding default property: spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CM\ SClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
Spark Submit - java.lang.IllegalArgumentException: requirement failed
Hi, I'm trying to run a streaming job on a single node EMR 4.1/Spark 1.5 cluster. Its throwing an IllegalArgumentException right away on the submit. Attaching full output from console. Thanks for any insights. -- Nick 15/12/11 16:44:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/12/11 16:44:43 INFO client.RMProxy: Connecting to ResourceManager at ip-10-247-129-50.ec2.internal/10.247.129.50:8032 15/12/11 16:44:43 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 15/12/11 16:44:43 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (54272 MB per container) 15/12/11 16:44:43 INFO yarn.Client: Will allocate AM container, with 11264 MB memory including 1024 MB overhead 15/12/11 16:44:43 INFO yarn.Client: Setting up container launch context for our AM 15/12/11 16:44:43 INFO yarn.Client: Setting up the launch environment for our AM container 15/12/11 16:44:43 INFO yarn.Client: Preparing resources for our AM container 15/12/11 16:44:44 INFO yarn.Client: Uploading resource file:/usr/lib/spark/lib/spark-assembly-1.5.0-hadoop2.6.0-amzn-1.jar -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoop/.sparkStaging/application_1447\ 442727308_0126/spark-assembly-1.5.0-hadoop2.6.0-amzn-1.jar 15/12/11 16:44:44 INFO metrics.MetricsSaver: MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500\ lastModified: 1447442734295 15/12/11 16:44:44 INFO metrics.MetricsSaver: Created MetricsSaver j-2H3BTA60FGUYO:i-f7812947:SparkSubmit:15603 period:60 /mnt/var/em/raw/i-f7812947_20151211_SparkSubmit_15603_raw.bin 15/12/11 16:44:45 INFO metrics.MetricsSaver: 1 aggregated HDFSWriteDelay 1276 raw values into 1 aggregated values, total 1 15/12/11 16:44:45 INFO yarn.Client: Uploading resource file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/workflow/lib/spark-kafka-services-1.0.jar -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoo\ p/.sparkStaging/application_1447442727308_0126/spark-kafka-services-1.0.jar 15/12/11 16:44:45 INFO yarn.Client: Uploading resource file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/conf/AwsCredentials.properties -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoop/.sparkSta\ ging/application_1447442727308_0126/AwsCredentials.properties 15/12/11 16:44:45 WARN yarn.Client: Resource file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/conf/AwsCredentials.properties added multiple times to distributed cache. 15/12/11 16:44:45 INFO yarn.Client: Deleting staging directory .sparkStaging/application_1447442727308_0126 Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6$$anonfun$apply$2.apply(Client.scala:392) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6$$anonfun$apply$2.apply(Client.scala:390) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6.apply(Client.scala:390) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6.apply(Client.scala:388) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:388) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:629) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:119) at org.apache.spark.deploy.yarn.Client.run(Client.scala:907) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:966) at org.apache.spark.deploy.yarn.Client.main(Client.scala) adjust ~/spark-pipeline-framework-1.1.6-SNAPSHOT > adjust ~/spark-pipeline-framework-1.1.6-SNAPSHOT > ./bin/run-event-streaming.sh conf/dev/nick-malcolm-events.properties > console.txt Using properties file: /usr/lib/spark/conf/spark-defaults.conf Adding default property: spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' Adding default property: spark.history.fs.logDirectory=hdfs:///var/log/spark/apps Adding default property: spark.eventLog.enabled=true Adding default property: spark.shuffle.service.enabled=true Adding default property: spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native Adding default property: spark.yarn.historyServer.address=ip-10-247-129-50.ec2.internal:18080 Adding default
Re: Spark Submit - java.lang.IllegalArgumentException: requirement failed
Thanks JB. I'm submitting from the AWS Spark master node, the spark-default.conf is pre-deployed by Amazon (attached) and there is no setting for spark.yarn.keytab. Is there any doc for setting this up if required in this scenario ? Also, I if deploy-mode is switched from cluster to client on spark-submit then the error no longer appears. Just wondering if there's any difference to using client versus cluster mode if the submit is being done on the master node. Thanks for any suggestions, -- Nick From: Jean-Baptiste Onofré <j...@nanthrax.net> Sent: Friday, December 11, 2015 1:01 PM To: user@spark.apache.org Subject: Re: Spark Submit - java.lang.IllegalArgumentException: requirement failed Hi Nick, the localizedPath has to be not null, that's why the requirement fails. In the SparkConf used by the spark-submit (default in conf/spark-default.conf), do you have all properties defined, especially spark.yarn.keytab ? Thanks, Regards JB On 12/11/2015 05:49 PM, Afshartous, Nick wrote: > > Hi, > > > I'm trying to run a streaming job on a single node EMR 4.1/Spark 1.5 > cluster. Its throwing an IllegalArgumentException right away on the submit. > > Attaching full output from console. > > > Thanks for any insights. > > -- > > Nick > > > > 15/12/11 16:44:43 WARN util.NativeCodeLoader: Unable to load > native-hadoop library for your platform... using builtin-java classes > where applicable > 15/12/11 16:44:43 INFO client.RMProxy: Connecting to ResourceManager at > ip-10-247-129-50.ec2.internal/10.247.129.50:8032 > 15/12/11 16:44:43 INFO yarn.Client: Requesting a new application from > cluster with 1 NodeManagers > 15/12/11 16:44:43 INFO yarn.Client: Verifying our application has not > requested more than the maximum memory capability of the cluster (54272 > MB per container) > 15/12/11 16:44:43 INFO yarn.Client: Will allocate AM container, with > 11264 MB memory including 1024 MB overhead > 15/12/11 16:44:43 INFO yarn.Client: Setting up container launch context > for our AM > 15/12/11 16:44:43 INFO yarn.Client: Setting up the launch environment > for our AM container > 15/12/11 16:44:43 INFO yarn.Client: Preparing resources for our AM container > 15/12/11 16:44:44 INFO yarn.Client: Uploading resource > file:/usr/lib/spark/lib/spark-assembly-1.5.0-hadoop2.6.0-amzn-1.jar -> > hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoop/.sparkStaging/application_1447\ > 442727308_0126/spark-assembly-1.5.0-hadoop2.6.0-amzn-1.jar > 15/12/11 16:44:44 INFO metrics.MetricsSaver: MetricsConfigRecord > disabledInCluster: false instanceEngineCycleSec: 60 > clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 > maxInstanceCount: 500\ > lastModified: 1447442734295 > 15/12/11 16:44:44 INFO metrics.MetricsSaver: Created MetricsSaver > j-2H3BTA60FGUYO:i-f7812947:SparkSubmit:15603 period:60 > /mnt/var/em/raw/i-f7812947_20151211_SparkSubmit_15603_raw.bin > 15/12/11 16:44:45 INFO metrics.MetricsSaver: 1 aggregated HDFSWriteDelay > 1276 raw values into 1 aggregated values, total 1 > 15/12/11 16:44:45 INFO yarn.Client: Uploading resource > file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/workflow/lib/spark-kafka-services-1.0.jar > -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoo\ > p/.sparkStaging/application_1447442727308_0126/spark-kafka-services-1.0.jar > 15/12/11 16:44:45 INFO yarn.Client: Uploading resource > file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/conf/AwsCredentials.properties > -> hdfs://ip-10-247-129-50.ec2.internal:8020/user/hadoop/.sparkSta\ > ging/application_1447442727308_0126/AwsCredentials.properties > 15/12/11 16:44:45 WARN yarn.Client: Resource > file:/home/hadoop/spark-pipeline-framework-1.1.6-SNAPSHOT/conf/AwsCredentials.properties > added multiple times to distributed cache. > 15/12/11 16:44:45 INFO yarn.Client: Deleting staging directory > .sparkStaging/application_1447442727308_0126 > Exception in thread "main" java.lang.IllegalArgumentException: > requirement failed > at scala.Predef$.require(Predef.scala:221) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6$$anonfun$apply$2.apply(Client.scala:392) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6$$anonfun$apply$2.apply(Client.scala:390) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6.apply(Client.scala:390) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$6.apply(Client.scala
Configuring Log4J (Spark 1.5 on EMR 4.1)
Hi, On Spark 1.5 on EMR 4.1 the message below appears in stderr in the Yarn UI. ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. I do see that there is /usr/lib/spark/conf/log4j.properties Can someone please advise on how to setup log4j properly. Thanks, -- Nick Notice: This communication is for the intended recipient(s) only and may contain confidential, proprietary, legally protected or privileged information of Turbine, Inc. If you are not the intended recipient(s), please notify the sender at once and delete this communication. Unauthorized use of the information in this communication is strictly prohibited and may be unlawful. For those recipients under contract with Turbine, Inc., the information in this communication is subject to the terms and conditions of any applicable contracts or agreements. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Configuring Log4J (Spark 1.5 on EMR 4.1)
< log4j.properties file only exists on the master and not the slave nodes, so you are probably running into https://issues.apache.org/jira/browse/SPARK-11105, which has already been fixed in the not-yet-released Spark 1.6.0. EMR will upgrade to Spark 1.6.0 once it is released. Thanks for the info, though this is a single-node cluster so that can't be the cause of the error (which is in the driver log). -- Nick From: Jonathan Kelly [jonathaka...@gmail.com] Sent: Thursday, November 19, 2015 6:45 PM To: Afshartous, Nick Cc: user@spark.apache.org Subject: Re: Configuring Log4J (Spark 1.5 on EMR 4.1) This file only exists on the master and not the slave nodes, so you are probably running into https://issues.apache.org/jira/browse/SPARK-11105, which has already been fixed in the not-yet-released Spark 1.6.0. EMR will upgrade to Spark 1.6.0 once it is released. ~ Jonathan On Thu, Nov 19, 2015 at 1:30 PM, Afshartous, Nick <nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote: Hi, On Spark 1.5 on EMR 4.1 the message below appears in stderr in the Yarn UI. ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. I do see that there is /usr/lib/spark/conf/log4j.properties Can someone please advise on how to setup log4j properly. Thanks, -- Nick Notice: This communication is for the intended recipient(s) only and may contain confidential, proprietary, legally protected or privileged information of Turbine, Inc. If you are not the intended recipient(s), please notify the sender at once and delete this communication. Unauthorized use of the information in this communication is strictly prohibited and may be unlawful. For those recipients under contract with Turbine, Inc., the information in this communication is subject to the terms and conditions of any applicable contracts or agreements. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> Notice: This communication is for the intended recipient(s) only and may contain confidential, proprietary, legally protected or privileged information of Turbine, Inc. If you are not the intended recipient(s), please notify the sender at once and delete this communication. Unauthorized use of the information in this communication is strictly prohibited and may be unlawful. For those recipients under contract with Turbine, Inc., the information in this communication is subject to the terms and conditions of any applicable contracts or agreements. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark/Kafka Streaming Job Gets Stuck
Hi, we are load testing our Spark 1.3 streaming (reading from Kafka) job and seeing a problem. This is running in AWS/Yarn and the streaming batch interval is set to 3 minutes and this is a ten node cluster. Testing at 30,000 events per second we are seeing the streaming job get stuck (stack trace below) for over an hour. Thanks on any insights or suggestions. -- Nick org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43) com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125) com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480) Notice: This communication is for the intended recipient(s) only and may contain confidential, proprietary, legally protected or privileged information of Turbine, Inc. If you are not the intended recipient(s), please notify the sender at once and delete this communication. Unauthorized use of the information in this communication is strictly prohibited and may be unlawful. For those recipients under contract with Turbine, Inc., the information in this communication is subject to the terms and conditions of any applicable contracts or agreements. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Using Sqark SQL mapping over an RDD
> You can't do nested operations on RDDs or DataFrames (i.e. you can't create a > DataFrame from within a map function). Perhaps if you explain what you are > trying to accomplish someone can suggest another way. The code below what I had in mind. For each Id, I'd like to run a query using the Id in the where clause, and then depending on the result possibly run a second query. Either the result of the first or second query will be used to construct the output of the map function. Thanks for any suggestions, -- Nick val result = deviceIds.map(deviceId => { val withAnalyticsId = sqlContext.sql( "select * from ad_info where deviceId = '%1s' and analyticsId <> 'null' order by messageTime asc limit 1" format (deviceId)) if (withAnalyticsId.count() > 0) { withAnalyticsId.take(1)(0) } else { val withoutAnalyticsId = sqlContext.sql("select * from ad_info where deviceId = '%1s' order by messageTime desc limit 1" format (deviceId)) withoutAnalyticsId.take(1)(0) } }) From: Michael Armbrust [mich...@databricks.com] Sent: Thursday, October 08, 2015 1:16 PM To: Afshartous, Nick Cc: user@spark.apache.org Subject: Re: Using Sqark SQL mapping over an RDD You can't do nested operations on RDDs or DataFrames (i.e. you can't create a DataFrame from within a map function). Perhaps if you explain what you are trying to accomplish someone can suggest another way. On Thu, Oct 8, 2015 at 10:10 AM, Afshartous, Nick <nafshart...@turbine.com<mailto:nafshart...@turbine.com>> wrote: Hi, Am using Spark, 1.5 in latest EMR 4.1. I have an RDD of String scala> deviceIds res25: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[18] at map at :28 and then when trying to map over the RDD while attempting to run a sql query the result is a NullPointerException scala> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count() with the stack trace below. If I run the query as a top level expression the count is retuned. There was additional code within the anonymous function that's been removed to try and isolate. Thanks for any insights or advice on how to debug this. -- Nick scala> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count() deviceIds.map(id => sqlContext.sql("select * from ad_info")).count() 15/10/08 16:12:56 INFO SparkContext: Starting job: count at :40 15/10/08 16:12:56 INFO DAGScheduler: Got job 18 (count at :40) with 200 output partitions 15/10/08 16:12:56 INFO DAGScheduler: Final stage: ResultStage 37(count at :40) 15/10/08 16:12:56 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 36) 15/10/08 16:12:56 INFO DAGScheduler: Missing parents: List() 15/10/08 16:12:56 INFO DAGScheduler: Submitting ResultStage 37 (MapPartitionsRDD[37] at map at :40), which has no missing parents 15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(17904) called with curMem=531894, maxMem=560993402 15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22 stored as values in memory (estimated size 17.5 KB, free 534.5 MB) 15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(7143) called with curMem=549798, maxMem=560993402 15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22_piece0 stored as bytes in memory (estimated size 7.0 KB, free 534.5 MB) 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 10.247.0.117:33555<http://10.247.0.117:33555> (size: 7.0 KB, free: 535.0 MB) 15/10/08 16:12:56 INFO SparkContext: Created broadcast 22 from broadcast at DAGScheduler.scala:861 15/10/08 16:12:56 INFO DAGScheduler: Submitting 200 missing tasks from ResultStage 37 (MapPartitionsRDD[37] at map at :40) 15/10/08 16:12:56 INFO YarnScheduler: Adding task set 37.0 with 200 tasks 15/10/08 16:12:56 INFO TaskSetManager: Starting task 0.0 in stage 37.0 (TID 649, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes) 15/10/08 16:12:56 INFO TaskSetManager: Starting task 1.0 in stage 37.0 (TID 650, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes) 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on ip-10-247-0-117.ec2.internal:46227 (size: 7.0 KB, free: 535.0 MB) 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on ip-10-247-0-117.ec2.internal:32938 (size: 7.0 KB, free: 535.0 MB) 15/10/08 16:12:56 INFO TaskSetManager: Starting task 2.0 in stage 37.0 (TID 651, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes) 15/10/08 16:12:56 WARN TaskSetManager: Lost task 0.0 in stage 37.0 (TID 649, ip-10-247-0-117.ec2.internal): java.lang.NullPointerException at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:40) at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$i