Re: Estimate size of Dataframe programatically
SizeEstimator.estimate(df) will not give the size of dataframe rt? I think it will give size of df object. With RDD, I sample() and collect() and sum size of each row. If I do the same with dataframe it will no longer be size when represented in columnar format. I'd also like to know how spark.sql.autoBroadcastJoinThreshold estimates size of dataframe. Is it going to broadcast when columnar storage size is less that 10 MB? Srikanth On Fri, Aug 7, 2015 at 2:51 PM, Ted Yu yuzhih...@gmail.com wrote: Have you tried calling SizeEstimator.estimate() on a DataFrame ? I did the following in REPL: scala SizeEstimator.estimate(df) res1: Long = 17769680 FYI On Fri, Aug 7, 2015 at 6:48 AM, Srikanth srikanth...@gmail.com wrote: Hello, Is there a way to estimate the approximate size of a dataframe? I know we can cache and look at the size in UI but I'm trying to do this programatically. With RDD, I can sample and sum up size using SizeEstimator. Then extrapolate it to the entire RDD. That will give me approx size of RDD. With dataframes, its tricky due to columnar storage. How do we do it? On a related note, I see size of RDD object to be ~60MB. Is that the footprint of RDD in driver JVM? scala val temp = sc.parallelize(Array(1,2,3,4,5,6)) scala SizeEstimator.estimate(temp) res13: Long = 69507320 Srikanth
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
ClosureCleaner does not work for java code
Consider two code snippets as the following: // Java code: abstract class Ops implements Serializable{ public abstract Integer apply(Integer x); public void doSomething(JavaRDDInteger rdd) { rdd.map(x - x + apply(x)) .collect() .forEach(System.out::println); } } public class AbstractTest { public static void main(String[] args) { new AbstractTest().job(); } public void job() { SparkConf conf = new SparkConf() .setAppName(AbstractTest.class.getName()) .setMaster(local[*]); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDInteger rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); Ops ops = new Ops() { @Override public Integer apply(Integer x) { return x + 1; } }; ops.doSomething(rdd); } } // Scala code: abstract class Ops extends Serializable { def apply(x: Int): Int def doSomething(rdd: RDD[Int]): Unit = { rdd.map(x = apply(x)).collect foreach println } } class AbstractTest { def job(): Unit = { val conf = new SparkConf() .setAppName(this.getClass.getName) .setMaster(local[*]) val sc = new SparkContext(conf) val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7)) val ops = new Ops() { override def apply(x: Int): Int = x + 1 } ops.doSomething(rdd) } } object AbstractTest { def main(args: Array[String]): Unit = { new AbstractTest().job() } } They are actually very similar, just doing the same thing, whereas the scala one works fine, and the java one does not. Task not serializable exception is encountered when the java code is executed, here is the state trace: Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.*ClosureCleaner* $.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47) at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24) at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52) at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.io.NotSerializableException: fr.leboncoin.etl.jobs.test.AbstractTest Serialization stack: *- object not serializable (class: test.AbstractTest, value: test.AbstractTest@61d84e08)* * - field (class: test.AbstractTest$1, name: this$0, type: class test.AbstractTest)* * - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796)* - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388, fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388@65753040) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, function1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 19 more It
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime().freeMemory(). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
Re: ClosureCleaner does not work for java code
The difference is really that Java and Scala work differently. In Java, your anonymous subclass of Ops defined in (a method of) AbstractTest captures a reference to it. That much is 'correct' in that it's how Java is supposed to work, and AbstractTest is indeed not serializable since you didn't declare it so. However the reference isn't actually used and Spark tries to remove references where possible for you. It can't always do it IIRC (e.g. nulling some fields would mutate objects in unpredictable ways) and I think that's what happens here. In the first place you want to avoid having this hidden reference by making, for instance, a static inner class or something. There's probably a lot of ways to rewrite this. Scala just works differently in the code that's generated. On Mon, Aug 10, 2015 at 4:32 PM, Hao Ren inv...@gmail.com wrote: Consider two code snippets as the following: // Java code: abstract class Ops implements Serializable{ public abstract Integer apply(Integer x); public void doSomething(JavaRDDInteger rdd) { rdd.map(x - x + apply(x)) .collect() .forEach(System.out::println); } } public class AbstractTest { public static void main(String[] args) { new AbstractTest().job(); } public void job() { SparkConf conf = new SparkConf() .setAppName(AbstractTest.class.getName()) .setMaster(local[*]); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDInteger rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); Ops ops = new Ops() { @Override public Integer apply(Integer x) { return x + 1; } }; ops.doSomething(rdd); } } // Scala code: abstract class Ops extends Serializable { def apply(x: Int): Int def doSomething(rdd: RDD[Int]): Unit = { rdd.map(x = apply(x)).collect foreach println } } class AbstractTest { def job(): Unit = { val conf = new SparkConf() .setAppName(this.getClass.getName) .setMaster(local[*]) val sc = new SparkContext(conf) val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7)) val ops = new Ops() { override def apply(x: Int): Int = x + 1 } ops.doSomething(rdd) } } object AbstractTest { def main(args: Array[String]): Unit = { new AbstractTest().job() } } They are actually very similar, just doing the same thing, whereas the scala one works fine, and the java one does not. Task not serializable exception is encountered when the java code is executed, here is the state trace: Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47) at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24) at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52) at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.io.NotSerializableException: fr.leboncoin.etl.jobs.test.AbstractTest Serialization stack: - object not serializable (class: test.AbstractTest, value: test.AbstractTest@61d84e08) - field (class: test.AbstractTest$1, name: this$0, type: class test.AbstractTest) - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial
Re: multiple dependency jars using pyspark
I figured out the issue - it had to do with the Cassandra jar I had compiled. I had tested a previous version. Using --jars (comma separated) and --driver-class-path (colon separated) is working. On Mon, Aug 10, 2015 at 1:08 AM ayan guha guha.a...@gmail.com wrote: Easiest way should be to add both jars in SPARK_CLASSPATH as a colon separated string. On 10 Aug 2015 06:20, Jonathan Haddad j...@jonhaddad.com wrote: I'm trying to write a simple job for Pyspark 1.4 migrating data from MySQL to Cassandra. I can work with either the MySQL JDBC jar or the cassandra jar separately without issue, but when I try to reference both of them it throws an exception: Py4JJavaError: An error occurred while calling o32.save. : java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; I'm not sure if I'm including the jars correctly as --jars says it's comma separated and --driver-class-path seems to take a colon delimited classpath. If I separate the list in --driver-class-path with a comma, i get a class not found exception so I'm thinking colon is right. The job, params for submission, and exception are here. Help getting this going would be deeply appreciated. https://gist.github.com/rustyrazorblade/9a38a9499a7531eefd1e
How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
Re: Estimate size of Dataframe programatically
From a quick glance of SparkStrategies.scala , when statistics.sizeInBytes of the LogicalPlan is = autoBroadcastJoinThreshold, the plan's output would be used in broadcast join as the 'build' relation. FYI On Mon, Aug 10, 2015 at 8:04 AM, Srikanth srikanth...@gmail.com wrote: SizeEstimator.estimate(df) will not give the size of dataframe rt? I think it will give size of df object. With RDD, I sample() and collect() and sum size of each row. If I do the same with dataframe it will no longer be size when represented in columnar format. I'd also like to know how spark.sql.autoBroadcastJoinThreshold estimates size of dataframe. Is it going to broadcast when columnar storage size is less that 10 MB? Srikanth On Fri, Aug 7, 2015 at 2:51 PM, Ted Yu yuzhih...@gmail.com wrote: Have you tried calling SizeEstimator.estimate() on a DataFrame ? I did the following in REPL: scala SizeEstimator.estimate(df) res1: Long = 17769680 FYI On Fri, Aug 7, 2015 at 6:48 AM, Srikanth srikanth...@gmail.com wrote: Hello, Is there a way to estimate the approximate size of a dataframe? I know we can cache and look at the size in UI but I'm trying to do this programatically. With RDD, I can sample and sum up size using SizeEstimator. Then extrapolate it to the entire RDD. That will give me approx size of RDD. With dataframes, its tricky due to columnar storage. How do we do it? On a related note, I see size of RDD object to be ~60MB. Is that the footprint of RDD in driver JVM? scala val temp = sc.parallelize(Array(1,2,3,4,5,6)) scala SizeEstimator.estimate(temp) res13: Long = 69507320 Srikanth
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote: I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime().freeMemory(). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
You need to keep a certain number of rdds around for checkpointing, based on e.g. the window size. Those would all need to be loaded at once. On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote: I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime().freeMemory (). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
Looks like workaround is to reduce *window length.* *Cheers* On Mon, Aug 10, 2015 at 10:07 AM, Cody Koeninger c...@koeninger.org wrote: You need to keep a certain number of rdds around for checkpointing, based on e.g. the window size. Those would all need to be loaded at once. On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote: I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime().freeMemory (). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
Re: Problems getting expected results from hbase_inputformat.py
Thank you Gen, the changes to HBaseConverters.scala look to now be returning all column qualifiers, as follows - (u'row1', {u'qualifier': u'a', u'timestamp': u'1438716994027', u'value': u'value1', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row1'}) (u'row1', {u'qualifier': u'b', u'timestamp': u'1438717004248', u'value': u'value2', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row1'}) (u'row2', {u'qualifier': u'', u'timestamp': u'1438717014529', u'value': u'value3', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row2'}) (u'row3', {u'qualifier': u'', u'timestamp': u'1438717022756', u'value': u'value4', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row3'}) Just to be clear, you refer to Spark update these two scripts recently.. What two scripts were you referencing? On Friday, August 7, 2015 7:59 PM, gen tang gen.tan...@gmail.com wrote: Hi, In fact, Pyspark use org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/) to transform object of Hbase result to python string.Spark update these two scripts recently. However, they are not included in the official release of spark. So you are trying to use this new python script with old jar. You can clone the newest code of spark from github and build examples jar. Then you can get correct result. CheersGen On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless eric.bl...@yahoo.com.invalid wrote: I’m having some difficulty getting the desired results fromthe Spark Python example hbase_inputformat.py. I’m running with CDH5.4, hbaseVersion 1.0.0, Spark v 1.3.0 Using Python version 2.6.6 I followed the example to create a test HBase table. Here’sthe data from the table I created – hbase(main):001:0 scan 'dev_wx_test'ROW COLUMN+CELLrow1 column=f1:a, timestamp=1438716994027, value=value1row1 column=f1:b, timestamp=1438717004248, value=value2row2 column=f1:, timestamp=1438717014529, value=value3row3 column=f1:, timestamp=1438717022756, value=value43 row(s) in 0.2620 seconds When either of these statements are included -“hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split(\n))” or “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split(\n)).countByValue().items()” the result is - We only get the following printed; (row1, value2) is notprinted: ((u'row1', u'value1'), 1) ((u'row2', u'value3'), 1) ((u'row3', u'value4'), 1) This looks like similar results to the following post Ifound -http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650but it appears the pythonconverterHBaseResultToStringConverter has been updated since then. And this problem will be resolved too. When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split(\n)).mapValues(json.loads)” is included, the result is – ValueError: No JSON object could be decoded ** Here is more info on this from the log – Traceback (most recent call last): Filehbase_inputformat.py, line 87, in module output =hbase_rdd.collect() File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py,line 701, in collect File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py,line 538, in __call__ File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py,line 300, in get_return_valuepy4j.protocol.Py4JJavaError: An erroroccurred while calling o44.collect.: org.apache.spark.SparkException: Jobaborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recentfailure: Lost task 0.3 in stage 1.0 (TID 4, stluhdpddev27.monsanto.com):org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py,line 101, in main process() File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py,line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py,line 236, in dump_stream vs =list(itertools.islice(iterator, batch)) File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py,line 1807, in lambda File/usr/lib64/python2.6/json/__init__.py, line 307, in loads return_default_decoder.decode(s) File/usr/lib64/python2.6/json/decoder.py, line 319, in decode
Re: subscribe
https://www.youtube.com/watch?v=H07zYvkNYL8 On Mon, Aug 10, 2015 at 10:55 AM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at the first section of https://spark.apache.org/community Cheers On Mon, Aug 10, 2015 at 10:54 AM, Phil Kallos phil.kal...@gmail.com wrote: please
Re: Spark inserting into parquet files with different schema
Older versions of Spark (i.e. when it was still called SchemaRDD instead of DataFrame) did not support merging different parquet schema. However, Spark 1.4+ should. On Sat, Aug 8, 2015 at 8:58 PM, sim s...@swoop.com wrote: Adam, did you find a solution for this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-inserting-into-parquet-files-with-different-schema-tp20706p24181.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark inserting into parquet files with different schema
Michael, is there an example anywhere that demonstrates how this works with the schema changing over time? Must the Hive tables be set up as external tables outside of saveAsTable? In my experience, in 1.4.1, writing to a table with SaveMode.Append fails if the schema don't match. Thanks, Sim From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Monday, August 10, 2015 at 2:36 PM To: Simeon Simeonov s...@swoop.commailto:s...@swoop.com Cc: user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark inserting into parquet files with different schema Older versions of Spark (i.e. when it was still called SchemaRDD instead of DataFrame) did not support merging different parquet schema. However, Spark 1.4+ should. On Sat, Aug 8, 2015 at 8:58 PM, sim s...@swoop.commailto:s...@swoop.com wrote: Adam, did you find a solution for this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-inserting-into-parquet-files-with-different-schema-tp20706p24181.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Kafka direct approach: blockInterval and topic partitions
Hi everyone, I recently started using the new Kafka direct approach. Now, as far as I understood, each Kafka partition /is/ an RDD partition that will be processed by a single core. What I don't understand is the relation between those partitions and the blocks generated every blockInterval. For example, assume: 1000ms batch interval 16 topic partitions (total of 16 cores available) Moreover, we have that the blockInterval is set to 200ms. What am I actually dividing by the blockInterval value in such a scenario? I'd like to tune this value but I cannot understand what it stands for. I hope I made myself clear, thank you all! :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-direct-approach-blockInterval-and-topic-partitions-tp24197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use custom Hadoop InputFormat in DataFrame?
You can't create a DataFrame from an arbitrary object since we don't know how to figure out the schema. You can either create a JavaBean https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema or manually create a row + specify the schema https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema . On Mon, Aug 10, 2015 at 11:22 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I have my own Hadoop custom InputFormat which I want to use in DataFrame. How do we do that? I know I can use sc.hadoopFile(..) but then how do I convert it into DataFrame JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,??); In above code what should I put in place of ?? I tried to put MyRecordWritable.class but it does not work as it is not schema it is Record Writable. Please guide. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-custom-Hadoop-InputFormat-in-DataFrame-tp24198.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming of WordCount example
I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Re: Problem with take vs. takeSample in PySpark
I tested this in master (1.5 release), it worked as expected (changed spark.driver.maxResultSize to 10m), len(sc.range(10).map(lambda i: '*' * (123) ).take(1)) 1 len(sc.range(10).map(lambda i: '*' * (124) ).take(1)) 15/08/10 10:45:55 ERROR TaskSetManager: Total size of serialized results of 1 tasks (16.1 MB) is bigger than spark.driver.maxResultSize (10.0 MB) len(sc.range(10).map(lambda i: '*' * (123) ).take(2)) 15/08/10 10:46:04 ERROR TaskSetManager: Total size of serialized results of 1 tasks (16.1 MB) is bigger than spark.driver.maxResultSize (10.0 MB) Could you reproduce this in 1.2? We didn't change take() much since 1.2 (unable build 1.2 branch right now, because of dependency changed) On Mon, Aug 10, 2015 at 9:49 AM, David Montague davwm...@gmail.com wrote: Hi all, I am getting some strange behavior with the RDD take function in PySpark while doing some interactive coding in an IPython notebook. I am running PySpark on Spark 1.2.0 in yarn-client mode if that is relevant. I am using sc.wholeTextFiles and pandas to load a collection of .csv files into an RDD of pandas dataframes. I create an RDD called train_rdd for which each row of the RDD contains a label and pandas dataframe pair: import pandas as pd from StringIO import StringIO rdd = sc.wholeTextFiles(data_path, 1000) train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1] In order to test out the next steps I want to take, I am trying to use take to select one of the dataframes and apply the desired modifications before writing out the Spark code to apply it to all of the dataframes in parallel. However, when I try to use take like this: label, df = train_rdd.take(1)[0] I get a spark.driver.maxResultSize error (stack trace included at the end of this message). Now, each of these dataframes is only about 100MB in size, so should easily fit on the driver and not go over the maxResultSize limit of 1024MB. If I instead use takeSample, though, there is no problem: label, df = train_rdd.takeSample(False, 1, seed=50)[0] (Here, I have set the seed so that the RDD that is selected is the same one that the take function is trying to load (i.e., the first one), just to ensure that it is not because the specific dataframe take is getting is too large.) Does calling take result in a collect operation being performed before outputting the first item? That would explain to me why this error is occurring, but that seems like poor behavior for the take function. Clearly takeSample is behaving the way I want it to, but it would be nice if I could get this behavior with the take function, or at least without needing to choose an element randomly. I was able to get the behavior I wanted above by just changing the seed until I got the dataframe I wanted, but I don't think that is a good approach in general. Any insight is appreciated. Best, David Montague --- Py4JJavaError Traceback (most recent call last) ipython-input-38-7eca647cba46 in module() 1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0] 2 label, df = train_rdd.take(1)[0] /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py in take(self, num) 1109 1110 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - res = self.context.runJob(self, takeUpToNumLeft, p, True) 1112 1113 items += res /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 816 # SparkContext#runJob. 817 mappedRDD = rdd.mapPartitions(partitionFunc) -- 818 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 819 return list(mappedRDD._collect_iterator_through_file(it)) 820 /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 177
Re: Kafka direct approach: blockInterval and topic partitions
Thank you! :) 2015-08-10 19:58 GMT+02:00 Cody Koeninger c...@koeninger.org: There's no long-running receiver pushing blocks of messages, so blockInterval isn't relevant. Batch interval is what matters. On Mon, Aug 10, 2015 at 12:52 PM, allonsy luke1...@gmail.com wrote: Hi everyone, I recently started using the new Kafka direct approach. Now, as far as I understood, each Kafka partition /is/ an RDD partition that will be processed by a single core. What I don't understand is the relation between those partitions and the blocks generated every blockInterval. For example, assume: 1000ms batch interval 16 topic partitions (total of 16 cores available) Moreover, we have that the blockInterval is set to 200ms. What am I actually dividing by the blockInterval value in such a scenario? I'd like to tune this value but I cannot understand what it stands for. I hope I made myself clear, thank you all! :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-direct-approach-blockInterval-and-topic-partitions-tp24197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming of WordCount example
If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Re: Streaming of WordCount example
In yarn-client mode, the driver is on the machine where you ran the spark-submit. The executors are running in the YARN cluster nodes, and the socket receiver listening on port is running in one of the executors. On Mon, Aug 10, 2015 at 11:43 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Re: How to create DataFrame from a binary file?
Umesh: Please take a look at the classes under: sql/core/src/main/scala/org/apache/spark/sql/parquet FYI On Mon, Aug 10, 2015 at 10:35 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Bo thanks much let me explain please see the following code JavaPairRDDString,PortableDataStream pairRdd = javaSparkContext.binaryFiles(/hdfs/path/to/binfile); JavaRDDPortableDataStream javardd = pairRdd.values(); DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd, PortableDataStream.class); binDataFrame.show(); //shows just one row with above file path /hdfs/path/to/binfile I want binary data in DataFrame from above file so that I can directly do analytics on it. My data is binary so I cant use StructType with primitive data types rigth since everything is binary/byte. My custom data format in binary is same as Parquet I did not find any good example where/how parquet is read into DataFrame. Please guide. On Sun, Aug 9, 2015 at 11:52 PM, bo yang bobyan...@gmail.com wrote: Well, my post uses raw text json file to show how to create data frame with a custom data schema. The key idea is to show the flexibility to deal with any format of data by using your own schema. Sorry if I did not make you fully understand. Anyway, let us know once you figure out your problem. On Sun, Aug 9, 2015 at 11:10 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Bo I know how to create a DataFrame my question is how to create a DataFrame for binary files and in your blog it is raw text json files please read my question properly thanks. On Sun, Aug 9, 2015 at 11:21 PM, bo yang bobyan...@gmail.com wrote: You can create your own data schema (StructType in spark), and use following method to create data frame with your own data schema: sqlContext.createDataFrame(yourRDD, structType); I wrote a post on how to do it. You can also get the sample code there: Light-Weight Self-Service Data Query through Spark SQL: https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang Take a look and feel free to let me know for any question. Best, Bo On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote: Hi how do we create DataFrame from a binary file stored in HDFS? I was thinking to use JavaPairRDDString,PortableDataStream pairRdd = javaSparkContext.binaryFiles(/hdfs/path/to/binfile); JavaRDDPortableDataStream javardd = pairRdd.values(); I can see that PortableDataStream has method called toArray which can convert into byte array I was thinking if I have JavaRDDbyte[] can I call the following and get DataFrame DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd,Byte.class); Please guide I am new to Spark. I have my own custom format which is binary format and I was thinking if I can convert my custom format into DataFrame using binary operations then I dont need to create my own custom Hadoop format am I on right track? Will reading binary data into DataFrame scale? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pagination on big table, splitting joins
I think to use *toLocalIterator* method and something like that, but I have doubts about memory and parallelism and sure there is a better way to do it. It will still run all earlier parts of the job in parallel. Only the actual retrieving of the final partitions will be serial. This is how we do pagination in the Spark SQL JDBC server.
Re: stopping spark stream app
Any help in best recommendation for gracefully shutting down a spark stream application ? I am running it on yarn and a way to tell from externally either yarn application -kill command or some other way but need current batch to be processed completely and checkpoint to be saved before shutting down. Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn kills the application immediately and dooes not call shutdown hook call back . On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How to ensure in spark streaming 1.3 with kafka that when an application is killed , last running batch is fully processed and offsets are written to checkpointing dir. On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark stream 1.3 and using custom checkpoint to save kafka offsets. 1.Is doing Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { jssc.stop(true, true); System.out.println(Inside Add Shutdown Hook); } }); to handle stop is safe ? 2.And I need to handle saving checkoinnt in shutdown hook also or driver will handle it automatically since it grcaefully stops stream and handle completion of foreachRDD function on stream ? directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() { } Thanks
Re: Kafka direct approach: blockInterval and topic partitions
There's no long-running receiver pushing blocks of messages, so blockInterval isn't relevant. Batch interval is what matters. On Mon, Aug 10, 2015 at 12:52 PM, allonsy luke1...@gmail.com wrote: Hi everyone, I recently started using the new Kafka direct approach. Now, as far as I understood, each Kafka partition /is/ an RDD partition that will be processed by a single core. What I don't understand is the relation between those partitions and the blocks generated every blockInterval. For example, assume: 1000ms batch interval 16 topic partitions (total of 16 cores available) Moreover, we have that the blockInterval is set to 200ms. What am I actually dividing by the blockInterval value in such a scenario? I'd like to tune this value but I cannot understand what it stands for. I hope I made myself clear, thank you all! :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-direct-approach-blockInterval-and-topic-partitions-tp24197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use custom Hadoop InputFormat in DataFrame?
Hi Michael thanks for the reply. I know that I can create DataFrame using JavaBean or Struct Type I want to know how can I create DataFrame from above code which is custom Hadoop format. On Tue, Aug 11, 2015 at 12:04 AM, Michael Armbrust mich...@databricks.com wrote: You can't create a DataFrame from an arbitrary object since we don't know how to figure out the schema. You can either create a JavaBean https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema or manually create a row + specify the schema https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema . On Mon, Aug 10, 2015 at 11:22 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I have my own Hadoop custom InputFormat which I want to use in DataFrame. How do we do that? I know I can use sc.hadoopFile(..) but then how do I convert it into DataFrame JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,??); In above code what should I put in place of ?? I tried to put MyRecordWritable.class but it does not work as it is not schema it is Record Writable. Please guide. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-custom-Hadoop-InputFormat-in-DataFrame-tp24198.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to create DataFrame from a binary file?
Hi Bo thanks much let me explain please see the following code JavaPairRDDString,PortableDataStream pairRdd = javaSparkContext.binaryFiles(/hdfs/path/to/binfile); JavaRDDPortableDataStream javardd = pairRdd.values(); DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd, PortableDataStream.class); binDataFrame.show(); //shows just one row with above file path /hdfs/path/to/binfile I want binary data in DataFrame from above file so that I can directly do analytics on it. My data is binary so I cant use StructType with primitive data types rigth since everything is binary/byte. My custom data format in binary is same as Parquet I did not find any good example where/how parquet is read into DataFrame. Please guide. On Sun, Aug 9, 2015 at 11:52 PM, bo yang bobyan...@gmail.com wrote: Well, my post uses raw text json file to show how to create data frame with a custom data schema. The key idea is to show the flexibility to deal with any format of data by using your own schema. Sorry if I did not make you fully understand. Anyway, let us know once you figure out your problem. On Sun, Aug 9, 2015 at 11:10 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Bo I know how to create a DataFrame my question is how to create a DataFrame for binary files and in your blog it is raw text json files please read my question properly thanks. On Sun, Aug 9, 2015 at 11:21 PM, bo yang bobyan...@gmail.com wrote: You can create your own data schema (StructType in spark), and use following method to create data frame with your own data schema: sqlContext.createDataFrame(yourRDD, structType); I wrote a post on how to do it. You can also get the sample code there: Light-Weight Self-Service Data Query through Spark SQL: https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang Take a look and feel free to let me know for any question. Best, Bo On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote: Hi how do we create DataFrame from a binary file stored in HDFS? I was thinking to use JavaPairRDDString,PortableDataStream pairRdd = javaSparkContext.binaryFiles(/hdfs/path/to/binfile); JavaRDDPortableDataStream javardd = pairRdd.values(); I can see that PortableDataStream has method called toArray which can convert into byte array I was thinking if I have JavaRDDbyte[] can I call the following and get DataFrame DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd,Byte.class); Please guide I am new to Spark. I have my own custom format which is binary format and I was thinking if I can convert my custom format into DataFrame using binary operations then I dont need to create my own custom Hadoop format am I on right track? Will reading binary data into DataFrame scale? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use custom Hadoop InputFormat in DataFrame?
Hi I have my own Hadoop custom InputFormat which I want to use in DataFrame. How do we do that? I know I can use sc.hadoopFile(..) but then how do I convert it into DataFrame JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,??); In above code what should I put in place of ?? I tried to put MyRecordWritable.class but it does not work as it is not schema it is Record Writable. Please guide. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-custom-Hadoop-InputFormat-in-DataFrame-tp24198.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
You need to keep a certain number of rdds around for checkpointing -- that seems like a hefty expense to pay in order to achieve fault tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be sufficient to just persist the offsets, to know where to resume from? Thanks. On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org wrote: You need to keep a certain number of rdds around for checkpointing, based on e.g. the window size. Those would all need to be loaded at once. On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote: I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime().freeMemory (). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
Re: Questions about SparkSQL join on not equality conditions
Hi, I am sorry to bother again. When I do join as follow: df = sqlContext.sql(selet a.someItem, b.someItem from a full outer join b on condition1 *or* condition2) df.first() The program failed at the result size is bigger than spark.driver.maxResultSize. It is really strange, as one record is no way bigger than 1G. When I do join on just one condition or equity condition, there will be no problem. Could anyone help me, please? Thanks a lot in advance. Cheers Gen On Sun, Aug 9, 2015 at 9:08 PM, gen tang gen.tan...@gmail.com wrote: Hi, I might have a stupid question about sparksql's implementation of join on not equality conditions, for instance condition1 or condition2. In fact, Hive doesn't support such join, as it is very difficult to express such conditions as a map/reduce job. However, sparksql supports such operation. So I would like to know how spark implement it. As I observe such join runs very slow, I guess that spark implement it by doing filter on the top of cartesian product. Is it true? Thanks in advance for your help. Cheers Gen
Re: How to programmatically create, submit and report on Spark jobs?
I found SPARK-3733 which was marked dup of SPARK-4924 which went to 1.4.0 FYI On Mon, Aug 10, 2015 at 5:12 AM, mark manwoodv...@googlemail.com wrote: Hi All I need to be able to create, submit and report on Spark jobs programmatically in response to events arriving on a Kafka bus. I also need end-users to be able to create data queries that launch Spark jobs 'behind the scenes'. I would expect to use the same API for both, and be able to provide a user friendly view (ie. *not *the Spark web UI) of all jobs (user and system) that are currently running, have completed, failed etc. Are there any tools / add-ons for this? Or is there a suggested approach? Thanks
Re: ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
I think this may be expected. When the streaming context is stopped without the SparkContext, then the receivers are stopped by the driver. The receiver sends back the message that it has been stopped. This is being (probably incorrectly) logged with ERROR level. On Sun, Aug 9, 2015 at 12:52 AM, Sadaf sa...@platalytics.com wrote: Hi When i tried to stop spark streaming using ssc.stop(false,true) It gives the following error. ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver 15/08/07 13:41:11 WARN ReceiverSupervisorImpl: Stopped executor without error 15/08/07 13:41:20 WARN WriteAheadLogManager : Failed to write to write ahead log I've implemented Streaming Listener and a Custom Receiver. Does anyone has idea about this? Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-ReceiverTracker-Deregistered-receiver-for-stream-0-Stopped-by-driver-tp24183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming of WordCount example
Is it receiving any data? If so, then it must be listening. Alternatively, to test these theories, you can locally running a spark standalone cluster (one node standalone cluster in local machine), and submit your app in client mode on that to see whether you are seeing the process listening on or not. On Mon, Aug 10, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I've verified all the executors and I don't see a process listening on the port. However, the application seem to show as running in the yarn UI On Mon, Aug 10, 2015 at 11:56 AM, Tathagata Das t...@databricks.com wrote: In yarn-client mode, the driver is on the machine where you ran the spark-submit. The executors are running in the YARN cluster nodes, and the socket receiver listening on port is running in one of the executors. On Mon, Aug 10, 2015 at 11:43 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Re: Graceful shutdown for Spark Streaming
From logs, it seems that Spark Streaming does handle *kill -SIGINT* with graceful shutdown. Please could you confirm? Thanks! On 30 July 2015 at 08:19, anshu shukla anshushuk...@gmail.com wrote: Yes I was doing same , if You mean that this is the correct way to do Then I will verify it once more in my case . On Thu, Jul 30, 2015 at 1:02 PM, Tathagata Das t...@databricks.com wrote: How is sleep not working? Are you doing streamingContext.start() Thread.sleep(xxx) streamingContext.stop() On Wed, Jul 29, 2015 at 6:55 PM, anshu shukla anshushuk...@gmail.com wrote: If we want to stop the application after fix-time period , how it will work . (How to give the duration in logic , in my case sleep(t.s.) is not working .) So i used to kill coarseGrained job at each slave by script .Please suggest something . On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das t...@databricks.com wrote: StreamingContext.stop(stopGracefully = true) stops the streaming context gracefully. Then you can safely terminate the Spark cluster. They are two different steps and needs to be done separately ensuring that the driver process has been completely terminated before the Spark cluster is the terminated. On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia mici...@gmail.com wrote: How to initiate graceful shutdown from outside of the Spark Streaming driver process? Both for the local and cluster mode of Spark Standalone as well as EMR. Does sbin/stop-all.sh stop the context gracefully? How is it done? Is there a signal sent to the driver process? For EMR, is there a way how to terminate an EMR cluster with Spark Streaming graceful shutdown? Thanks! -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Fw: Your Application has been Received
Bar123 On Monday, 10 August 2015, 20:20, Resourcing Team barclayscare...@invalidemail.com wrote: Dear Shing Hing, Thank you for applying to Barclays. We have received your application and are currently reviewing your details. Updates on your progress will be emailed to you and can be accessed through your profile on our career site. If you have any special requirements for interview please ensure that you highlight these to the Recruitment Advisor if you are invited for interview. We thank you for your interest in Barclays. Kind Regards, Barclays Resourcing Replies to this message are undeliverable and will not reach the Resourcing Team. Please do not reply. this_message_in_html.html Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Java Streaming Context - File Stream use
Please help as not sure what is incorrect with below code as it gives me complilaton error in eclipse SparkConf sparkConf = new SparkConf().setMaster(local[4]).setAppName(JavaDirectKafkaWordCount); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); *jssc.fileStream(/home/, String.class, String.class, TextInputFormat.class);*
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
Well, RDDs also contain data, don't they? The question is, what can be so hefty in the checkpointing directory to cause Spark driver to run out of memory? It seems that it makes checkpointing expensive, in terms of I/O and memory consumption. Two network hops -- to driver, then to workers. Hefty file system usage, hefty memory consumption... What can we do to offset some of these costs? On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger c...@koeninger.org wrote: The rdd is indeed defined by mostly just the offsets / topic partitions. On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: You need to keep a certain number of rdds around for checkpointing -- that seems like a hefty expense to pay in order to achieve fault tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be sufficient to just persist the offsets, to know where to resume from? Thanks. On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org wrote: You need to keep a certain number of rdds around for checkpointing, based on e.g. the window size. Those would all need to be loaded at once. On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote: I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime(). freeMemory(). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at
collect() works, take() returns ImportError: No module named iter
I'm running Spark 1.3 on CDH 5.4.4, and trying to set up Spark to run via iPython Notebook. I'm getting collect() to work just fine, but take() errors. (I'm having issues with collect() on other datasets ... but take() seems to break every time I run it.) My code is below. Any thoughts? sc pyspark.context.SparkContext at 0x7ffbfa310f10 sys.version '2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]' hourly = sc.textFile('tester') hourly.collect() [u'a man', u'a plan', u'a canal', u'panama'] hourly = sc.textFile('tester') hourly.take(2) --- Py4JJavaError Traceback (most recent call last) ipython-input-15-1feecba5868b in module() 1 hourly = sc.textFile('tester') 2 hourly.take(2) /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in take(self, num) 1223 1224 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - 1225 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1226 1227 items += res /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 841 # SparkContext#runJob. 842 mappedRDD = rdd.mapPartitions(partitionFunc) -- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 844 return list(mappedRDD._collect_iterator_through_file(it)) 845 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 47, dhd490101.autotrader.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py, line 101, in main process() File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py, line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/serializers.py, line 236, in dump_stream vs = list(itertools.islice(iterator, batch)) File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py, line 1220, in takeUpToNumLeft while taken left: ImportError: No module named iter at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at
Optimal way to implement a small lookup table for identifiers in an RDD
Hi All, I have an RDD of case class objects. scala case class Entity( | value: String, | identifier: String | ) defined class Entity scala Entity(hello, id1) res25: Entity = Entity(hello,id1) During a map operation, I'd like to return a new RDD that contains all of the data of the original RDD with the addition of new data that was looked up based on the identifiers provided. The lookup table table in Cassandra looks something like... id| type -+- id1 | action id2 | view The end result would be an RDD of EntityExtended case class EntityExtended( value: String, identifier: String type: String ) I believe that it would make sense to use a broadcast variable. However, I'm not sure what the best way would be to incorporate it during a map operation. rdd.map(MyObject.extendEntity) object MyObject { def extendEntity(entity: Entity): EntityExtended = { val id = entity.identifier // lookup identifier in broadcast variable? } } Thanks, Mike.
Re: avoid duplicate due to executor failure in spark stream
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations https://www.youtube.com/watch?v=fXnNEq1v3VA On Mon, Aug 10, 2015 at 4:32 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How can I avoid duplicate processing of kafka messages in spark stream 1.3 because of executor failure. 1.Can I some how access accumulators of failed task in retry task to skip those many events which are already processed by failed task on this partition ? 2.Or I ll have to persist each msg processed and then check before processing each msg whether its already processed by failure task and delete this perisited information at each batch end?
When will window ....
When will window functions be integrated into Spark (without HiveContext?) Gesendet mit AquaMail für Android http://www.aqua-mail.com Am 10. August 2015 23:04:22 schrieb Michael Armbrust mich...@databricks.com: You will need to use a HiveContext for window functions to work. On Mon, Aug 10, 2015 at 1:26 PM, Jerry jerry.c...@gmail.com wrote: Hello, Using Apache Spark 1.4.1 I'm unable to use lag or lead when making queries to a data frame and I'm trying to figure out if I just have a bad setup or if this is a bug. As for the exceptions I get: when using selectExpr() with a string as an argument, I get NoSuchElementException: key not found: lag and when using the select method and ...spark.sql.functions.lag I get an AnalysisException. If I replace lag with abs in the first case, Spark runs without exception, so none of the other syntax is incorrect. As for how I'm running it; the code is written in Java with a static method that takes the SparkContext as an argument which is used to create a JavaSparkContext which then is used to create an SQLContext which loads a json file from the local disk and runs those queries on that data frame object. FYI: the java code is compiled, jared and then pointed to with -cp when starting the spark shell, so all I do is Test.run(sc) in shell. Let me know what to look for to debug this problem. I'm not sure where to look to solve this problem. Thanks, Jerry
Re: stopping spark stream app
Thanks! On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das t...@databricks.com wrote: 1. RPC can be done in many ways, and a web service is one of many ways. A even more hacky version can be the app polling a file in a file system, if the file exists start shutting down. 2. No need to set a flag. When you get the signal from RPC, you can just call context.stop(stopGracefully = true) . Though note that this is blocking, so gotta be carefully about doing blocking calls on the RPC thread. On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora shushantaror...@gmail.com wrote: By RPC you mean web service exposed on driver which listens and set some flag and driver checks that flag at end of each batch and if set then gracefully stop the application ? On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das t...@databricks.com wrote: In general, it is a little risky to put long running stuff in a shutdown hook as it may delay shutdown of the process which may delay other things. That said, you could try it out. A better way to explicitly shutdown gracefully is to use an RPC to signal the driver process to start shutting down, and then the process will gracefully stop the context and terminate. This is more robust that than leveraging shutdown hooks. On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Any help in best recommendation for gracefully shutting down a spark stream application ? I am running it on yarn and a way to tell from externally either yarn application -kill command or some other way but need current batch to be processed completely and checkpoint to be saved before shutting down. Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn kills the application immediately and dooes not call shutdown hook call back . On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How to ensure in spark streaming 1.3 with kafka that when an application is killed , last running batch is fully processed and offsets are written to checkpointing dir. On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark stream 1.3 and using custom checkpoint to save kafka offsets. 1.Is doing Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { jssc.stop(true, true); System.out.println(Inside Add Shutdown Hook); } }); to handle stop is safe ? 2.And I need to handle saving checkoinnt in shutdown hook also or driver will handle it automatically since it grcaefully stops stream and handle completion of foreachRDD function on stream ? directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() { } Thanks
Re: Is there any external dependencies for lag() and lead() when using data frames?
You will need to use a HiveContext for window functions to work. On Mon, Aug 10, 2015 at 1:26 PM, Jerry jerry.c...@gmail.com wrote: Hello, Using Apache Spark 1.4.1 I'm unable to use lag or lead when making queries to a data frame and I'm trying to figure out if I just have a bad setup or if this is a bug. As for the exceptions I get: when using selectExpr() with a string as an argument, I get NoSuchElementException: key not found: lag and when using the select method and ...spark.sql.functions.lag I get an AnalysisException. If I replace lag with abs in the first case, Spark runs without exception, so none of the other syntax is incorrect. As for how I'm running it; the code is written in Java with a static method that takes the SparkContext as an argument which is used to create a JavaSparkContext which then is used to create an SQLContext which loads a json file from the local disk and runs those queries on that data frame object. FYI: the java code is compiled, jared and then pointed to with -cp when starting the spark shell, so all I do is Test.run(sc) in shell. Let me know what to look for to debug this problem. I'm not sure where to look to solve this problem. Thanks, Jerry
avoid duplicate due to executor failure in spark stream
Hi How can I avoid duplicate processing of kafka messages in spark stream 1.3 because of executor failure. 1.Can I some how access accumulators of failed task in retry task to skip those many events which are already processed by failed task on this partition ? 2.Or I ll have to persist each msg processed and then check before processing each msg whether its already processed by failure task and delete this perisited information at each batch end?
Re: collect() works, take() returns ImportError: No module named iter
There is was a similar problem reported before on this list. Weird python errors like this generally mean you have different versions of python in the nodes of your cluster. Can you check that? From error stack you use 2.7.10 |Anaconda 2.3.0 while OS/CDH version of Python is probably 2.6. -- Ruslan Dautkhanov On Mon, Aug 10, 2015 at 3:53 PM, YaoPau jonrgr...@gmail.com wrote: I'm running Spark 1.3 on CDH 5.4.4, and trying to set up Spark to run via iPython Notebook. I'm getting collect() to work just fine, but take() errors. (I'm having issues with collect() on other datasets ... but take() seems to break every time I run it.) My code is below. Any thoughts? sc pyspark.context.SparkContext at 0x7ffbfa310f10 sys.version '2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]' hourly = sc.textFile('tester') hourly.collect() [u'a man', u'a plan', u'a canal', u'panama'] hourly = sc.textFile('tester') hourly.take(2) --- Py4JJavaError Traceback (most recent call last) ipython-input-15-1feecba5868b in module() 1 hourly = sc.textFile('tester') 2 hourly.take(2) /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in take(self, num) 1223 1224 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - 1225 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1226 1227 items += res /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 841 # SparkContext#runJob. 842 mappedRDD = rdd.mapPartitions(partitionFunc) -- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 844 return list(mappedRDD._collect_iterator_through_file(it)) 845 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 47, dhd490101.autotrader.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py, line 101, in main process() File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py, line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/serializers.py, line 236, in dump_stream vs = list(itertools.islice(iterator, batch)) File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py, line 1220, in takeUpToNumLeft while taken left: ImportError: No module named iter at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at
Re: Streaming of WordCount example
I am using the same exact code: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Submitting like this: yarn: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master yarn-client --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out local: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master spark://localhost:9966 --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out Even though I am running as local I see it being scheduled and managed by yarn. On Mon, Aug 10, 2015 at 12:56 PM, Tathagata Das t...@databricks.com wrote: Is it receiving any data? If so, then it must be listening. Alternatively, to test these theories, you can locally running a spark standalone cluster (one node standalone cluster in local machine), and submit your app in client mode on that to see whether you are seeing the process listening on or not. On Mon, Aug 10, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I've verified all the executors and I don't see a process listening on the port. However, the application seem to show as running in the yarn UI On Mon, Aug 10, 2015 at 11:56 AM, Tathagata Das t...@databricks.com wrote: In yarn-client mode, the driver is on the machine where you ran the spark-submit. The executors are running in the YARN cluster nodes, and the socket receiver listening on port is running in one of the executors. On Mon, Aug 10, 2015 at 11:43 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Re: TFIDF Transformation
If you want to convert the hash to word, the very thought defies the usage of hashing. You may map the words with hashing, but that wouldn't be good. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TFIDF-Transformation-tp24086p24203.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: collect() works, take() returns ImportError: No module named iter
Is it possible that you have Python 2.7 on the driver, but Python 2.6 on the workers?. PySpark requires that you have the same minor version of Python in both driver and worker. In PySpark 1.4+, it will do this check before run any tasks. On Mon, Aug 10, 2015 at 2:53 PM, YaoPau jonrgr...@gmail.com wrote: I'm running Spark 1.3 on CDH 5.4.4, and trying to set up Spark to run via iPython Notebook. I'm getting collect() to work just fine, but take() errors. (I'm having issues with collect() on other datasets ... but take() seems to break every time I run it.) My code is below. Any thoughts? sc pyspark.context.SparkContext at 0x7ffbfa310f10 sys.version '2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]' hourly = sc.textFile('tester') hourly.collect() [u'a man', u'a plan', u'a canal', u'panama'] hourly = sc.textFile('tester') hourly.take(2) --- Py4JJavaError Traceback (most recent call last) ipython-input-15-1feecba5868b in module() 1 hourly = sc.textFile('tester') 2 hourly.take(2) /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in take(self, num) 1223 1224 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - 1225 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1226 1227 items += res /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 841 # SparkContext#runJob. 842 mappedRDD = rdd.mapPartitions(partitionFunc) -- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 844 return list(mappedRDD._collect_iterator_through_file(it)) 845 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 47, dhd490101.autotrader.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py, line 101, in main process() File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py, line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/serializers.py, line 236, in dump_stream vs = list(itertools.islice(iterator, batch)) File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py, line 1220, in takeUpToNumLeft while taken left: ImportError: No module named iter at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at
Re: Do I really need to build Spark for Hive/Thrift Server support?
Hi All, Any explanation for this? As Reece said I can do operations with hive but - val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) -- gives error. I have already created spark ec2 cluster with the spark-ec2 script. How can I build it again? Thanks _Roni On Tue, Jul 28, 2015 at 2:46 PM, ReeceRobinson re...@therobinsons.gen.nz wrote: I am building an analytics environment based on Spark and want to use HIVE in multi-user mode i.e. not use the embedded derby database but use Postgres and HDFS instead. I am using the included Spark Thrift Server to process queries using Spark SQL. The documentation gives me the impression that I need to create a custom build of Spark 1.4.1. However I don't think this is either accurate now OR it is for some different context I'm not aware of? I used the pre-built Spark 1.4.1 distribution today with my hive-site.xml for Postgres and HDFS and it worked! I see the warehouse files turn up in HDFS and I see the metadata inserted into Postgres when I created a test table. I can connect to the Thrift Server using beeline and perform queries on my data. I also verified using the Spark UI that the SQL is being processed by Spark SQL. So I guess I'm asking is the document out-of-date or am I missing something? Cheers, Reece -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Do-I-really-need-to-build-Spark-for-Hive-Thrift-Server-support-tp24013p24039.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Json parsing library for Spark Streaming?
I use Play json, may be its very famous. If you would like to try below is the sbt dependency com.typesafe.play % play-json_2.10 % 2.2.1, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Json-parsing-library-for-Spark-Streaming-tp24016p24204.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Random Forest and StringIndexer in pyspark ML Pipeline
Hi, If I understand the RandomForest model in the ML Pipeline implementation in the ml package correctly, I have to first run my outcome label variable through the StringIndexer, even if my labels are numeric. The StringIndexer then converts the labels into numeric indices based on frequency of the label. This could create situations where if I'm classifying binary outcomes where my original labels are simply 0 and 1, the StringIndexer may actually flip my labels such that 0s become 1s and 1s become 0s if my original 1s were more frequent. This transformation would then extend itself to the predictions. In the old mllib implementation, the RF does not require the labels to be changed and I could use 0/1 labels without worrying about them being transformed. I was wondering: 1. Why is this the default implementation for the Pipeline RF? This seems like it could cause a lot of confusion in cases like the one I outlined above. 2. Is there a way to avoid this by either controlling how the indices are created in StringIndexer or bypassing StringIndexer altogether? 3. If 2 is not possible, is there an easy way to see how my original labels mapped onto the indices so that I can revert the predictions back to the original labels rather than the transformed labels? I suppose I could do this by counting the original labels and mapping by frequency, but it seems like there should be a more straightforward way to get it out of the StringIndexer. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-and-StringIndexer-in-pyspark-ML-Pipeline-tp24200.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming of WordCount example
I do see this message: 15/08/10 19:19:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources On Mon, Aug 10, 2015 at 4:15 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am using the same exact code: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Submitting like this: yarn: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master yarn-client --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out local: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master spark://localhost:9966 --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out Even though I am running as local I see it being scheduled and managed by yarn. On Mon, Aug 10, 2015 at 12:56 PM, Tathagata Das t...@databricks.com wrote: Is it receiving any data? If so, then it must be listening. Alternatively, to test these theories, you can locally running a spark standalone cluster (one node standalone cluster in local machine), and submit your app in client mode on that to see whether you are seeing the process listening on or not. On Mon, Aug 10, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I've verified all the executors and I don't see a process listening on the port. However, the application seem to show as running in the yarn UI On Mon, Aug 10, 2015 at 11:56 AM, Tathagata Das t...@databricks.com wrote: In yarn-client mode, the driver is on the machine where you ran the spark-submit. The executors are running in the YARN cluster nodes, and the socket receiver listening on port is running in one of the executors. On Mon, Aug 10, 2015 at 11:43 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Re: Streaming of WordCount example
1. When you are running locally, make sure the master in the SparkConf reflects that and is not somehow set to yarn-client 2. You may not be getting any resources from YARN at all, so no executors, so no receiver running. That is why I asked the most basic question - Is it receiving data? That will eliminate a lot of uncertainities if it is indeed receiving data. TD On Mon, Aug 10, 2015 at 4:21 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I do see this message: 15/08/10 19:19:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources On Mon, Aug 10, 2015 at 4:15 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am using the same exact code: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Submitting like this: yarn: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master yarn-client --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out local: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master spark://localhost:9966 --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out Even though I am running as local I see it being scheduled and managed by yarn. On Mon, Aug 10, 2015 at 12:56 PM, Tathagata Das t...@databricks.com wrote: Is it receiving any data? If so, then it must be listening. Alternatively, to test these theories, you can locally running a spark standalone cluster (one node standalone cluster in local machine), and submit your app in client mode on that to see whether you are seeing the process listening on or not. On Mon, Aug 10, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I've verified all the executors and I don't see a process listening on the port. However, the application seem to show as running in the yarn UI On Mon, Aug 10, 2015 at 11:56 AM, Tathagata Das t...@databricks.com wrote: In yarn-client mode, the driver is on the machine where you ran the spark-submit. The executors are running in the YARN cluster nodes, and the socket receiver listening on port is running in one of the executors. On Mon, Aug 10, 2015 at 11:43 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Spark Kinesis Checkpointing/Processing Delay
Hi! Sorry if this is a repost. I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil
Re: Spark inserting into parquet files with different schema
What is the error you are getting? It would also be awesome if you could try with Spark 1.5 when the first preview comes out (hopefully early next week). On Mon, Aug 10, 2015 at 11:41 AM, Simeon Simeonov s...@swoop.com wrote: Michael, is there an example anywhere that demonstrates how this works with the schema changing over time? Must the Hive tables be set up as external tables outside of saveAsTable? In my experience, in 1.4.1, writing to a table with SaveMode.Append fails if the schema don't match. Thanks, Sim From: Michael Armbrust mich...@databricks.com Date: Monday, August 10, 2015 at 2:36 PM To: Simeon Simeonov s...@swoop.com Cc: user user@spark.apache.org Subject: Re: Spark inserting into parquet files with different schema Older versions of Spark (i.e. when it was still called SchemaRDD instead of DataFrame) did not support merging different parquet schema. However, Spark 1.4+ should. On Sat, Aug 8, 2015 at 8:58 PM, sim s...@swoop.com wrote: Adam, did you find a solution for this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-inserting-into-parquet-files-with-different-schema-tp20706p24181.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark inserting into parquet files with different schema
Michael, please, see http://apache-spark-user-list.1001560.n3.nabble.com/Schema-evolution-in-tables-tt23999.html The exception is java.lang.RuntimeException: Relation[ ... ] org.apache.spark.sql.parquet.ParquetRelation2@83a73a05 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE statement generates the same number of columns as its schema. Is this behavior expected? Shall I create a JIRA issue if it is not? From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Monday, August 10, 2015 at 3:44 PM To: Simeon Simeonov s...@swoop.commailto:s...@swoop.com Cc: user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark inserting into parquet files with different schema What is the error you are getting? It would also be awesome if you could try with Spark 1.5 when the first preview comes out (hopefully early next week). On Mon, Aug 10, 2015 at 11:41 AM, Simeon Simeonov s...@swoop.commailto:s...@swoop.com wrote: Michael, is there an example anywhere that demonstrates how this works with the schema changing over time? Must the Hive tables be set up as external tables outside of saveAsTable? In my experience, in 1.4.1, writing to a table with SaveMode.Append fails if the schema don't match. Thanks, Sim From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Monday, August 10, 2015 at 2:36 PM To: Simeon Simeonov s...@swoop.commailto:s...@swoop.com Cc: user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark inserting into parquet files with different schema Older versions of Spark (i.e. when it was still called SchemaRDD instead of DataFrame) did not support merging different parquet schema. However, Spark 1.4+ should. On Sat, Aug 8, 2015 at 8:58 PM, sim s...@swoop.commailto:s...@swoop.com wrote: Adam, did you find a solution for this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-inserting-into-parquet-files-with-different-schema-tp20706p24181.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Is there any external dependencies for lag() and lead() when using data frames?
Hello, Using Apache Spark 1.4.1 I'm unable to use lag or lead when making queries to a data frame and I'm trying to figure out if I just have a bad setup or if this is a bug. As for the exceptions I get: when using selectExpr() with a string as an argument, I get NoSuchElementException: key not found: lag and when using the select method and ...spark.sql.functions.lag I get an AnalysisException. If I replace lag with abs in the first case, Spark runs without exception, so none of the other syntax is incorrect. As for how I'm running it; the code is written in Java with a static method that takes the SparkContext as an argument which is used to create a JavaSparkContext which then is used to create an SQLContext which loads a json file from the local disk and runs those queries on that data frame object. FYI: the java code is compiled, jared and then pointed to with -cp when starting the spark shell, so all I do is Test.run(sc) in shell. Let me know what to look for to debug this problem. I'm not sure where to look to solve this problem. Thanks, Jerry
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
The rdd is indeed defined by mostly just the offsets / topic partitions. On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: You need to keep a certain number of rdds around for checkpointing -- that seems like a hefty expense to pay in order to achieve fault tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be sufficient to just persist the offsets, to know where to resume from? Thanks. On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org wrote: You need to keep a certain number of rdds around for checkpointing, based on e.g. the window size. Those would all need to be loaded at once. On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote: I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime().freeMemory (). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
Inquery about contributing codes
Dear Sir / Madam, I have a plan to contribute some codes about passing filters to a datasource as physical planning. In more detail, I understand when we want to build up filter operations from data like Parquet (when actually reading and filtering HDFS blocks at first not filtering in memory with Spark operations), we need to implement PrunedFilteredScan, PrunedScan or CatalystScan in package org.apache.spark.sql.sources. For PrunedFilteredScan and PrunedScan, it pass the filter objects in package org.apache.spark.sql.sources, which do not access directly to the query parser but are objects built by selectFilters() in package org.apache.spark.sql.sources.DataSourceStrategy. It looks all the filters (rather raw expressions) do not pass to the function below in PrunedFilteredScan and PrunedScan. def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] The passing filters in here are defined in package org.apache.spark.sql.sources. On the other hand, it does not pass EqualNullSafe filter in package org.apache.spark.sql.catalyst.expressions even though this looks possible to pass for other datasources such as Parquet and JSON. I understand that CatalystScan can take the all raw expression accessing to the query planner. However, it is experimental and also it needs different interfaces (as well as unstable for the reasons such as binary capability). As far as I know, Parquet also does not use this. In general, this can be a issue as a user send a query to data such as 1. SELECT * FROM table WHERE field = 1; 2. SELECT * FROM table WHERE field = 1; The second query can be hugely slow because of large network traffic by not filtered data from the source RDD. Also,I could not find a proper issue for this (except for https://issues.apache.org/jira/browse/SPARK-8747) which says it supports now binary capability. Accordingly, I want to add this issue and make a pull request with my codes. Could you please make any comments for this? Thanks.
Re: Why use spark.history.fs.logDirectory instead of spark.eventLog.dir
Anyone know this ? Thanks On Fri, Aug 7, 2015 at 4:20 PM, canan chen ccn...@gmail.com wrote: Is there any reason that historyserver use another property for the event log dir ? Thanks
Re: Accessing S3 files with s3n://
Hi Jerry, Akhil, Thanks your your help. With s3n, the entire file is downloaded even while just creating the RDD with sqlContext.read.parquet(). It seems like even just opening and closing the InputStream causes the entire data to get fetched. As it turned out, I was able to use s3a and avoid this problem. I was under the impression that s3a was only meant for using EMRFS, where the metadata of the FS is kept separately. This is not true; s3a maps object keys directly to file names and directories. On Sun, Aug 9, 2015 at 6:01 AM, Jerry Lam chiling...@gmail.com wrote: Hi Akshat, Is there a particular reason you don't use s3a? From my experience,s3a performs much better than the rest. I believe the inefficiency is from the implementation of the s3 interface. Best Regards, Jerry Sent from my iPhone On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.com wrote: Depends on which operation you are doing, If you are doing a .count() on a parquet, it might not download the entire file i think, but if you do a .count() on a normal text file it might pull the entire file. Thanks Best Regards On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote: Hi, I've been trying to track down some problems with Spark reads being very slow with s3n:// URIs (NativeS3FileSystem). After some digging around, I realized that this file system implementation fetches the entire file, which isn't really a Spark problem, but it really slows down things when trying to just read headers from a Parquet file or just creating partitions in the RDD. Is this something that others have observed before, or am I doing something wrong? Thanks, Akshat
Differents in loading data using spark datasource api and using jdbc
Hi,everyone. I have one question in loading data using spark datasource api and using jdbc that which way is effective?
Re: Spark Cassandra Connector issue
HI All, I have tried Commands as mentioned below but still it is errors dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0. 1-SNAPSHOT.jar dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar,/home/missingmerch/etl-0.0. 1-SNAPSHOT.jar I understand only problem with the way I provide list of jar file in the command, if anybody using Datastax Enterprise could please provide thier inputs to get this issue resolved Thanks for your support Satish Chandra On Mon, Aug 10, 2015 at 7:16 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know if DSE changed spark-submit, but you have to use a comma-separated list of jars to --jars. It probably looked for HelloWorld in the second one, the dse.jar file. Do this: dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0. 1-SNAPSHOT.jar I also removed the extra //. Or put file: in front of them so they are proper URLs. Note the snapshot jar isn't in the --jars list. I assume that's where HelloWorld is found. Confusing, yes it is... dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 8:23 AM, satish chandra j jsatishchan...@gmail.com wrote: Hi, Thanks for quick input, now I am getting class not found error *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* java.lang.ClassNotFoundException: HelloWorld at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Previously I could fix the issue by changing the order of arguments passing in DSE command line interface but now I am not sure why the issue again Please let me know if still I am missing anything in my Command as mentioned above(as insisted I have added dse.jar and spark-cassandra-connector-java_2.10.1.1.1.jar) Thanks for support Satish Chandra On Mon, Aug 10, 2015 at 6:19 PM, Dean Wampler deanwamp...@gmail.com wrote: Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
Re: Differents in loading data using spark datasource api and using jdbc
Hi, As I understand JDBC is meant for moderate voulme of data but Datasource api is a better option if volume of data volume is more Datasource API is not available is lower version of Spark such as 1.2.0 Regards, Satish On Tue, Aug 11, 2015 at 8:53 AM, 李铖 lidali...@gmail.com wrote: Hi,everyone. I have one question in loading data using spark datasource api and using jdbc that which way is effective?
Writing a DataFrame as compressed JSON
DataFrameReader.json() can handle gzipped JSONlines files automatically but there doesn't seem to be a way to get DataFrameWriter.json() to write compressed JSONlines files. Uncompressed JSONlines is a very expensive from an I/O standpoint because field names are included in every record. Is there a way around this problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-a-DataFrame-as-compressed-JSON-tp24206.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to programmatically create, submit and report on Spark jobs?
For monitoring, please take a look at http://spark.apache.org/docs/latest/monitoring.html Especially REST API section. Cheers On Mon, Aug 10, 2015 at 8:33 AM, Ted Yu yuzhih...@gmail.com wrote: I found SPARK-3733 which was marked dup of SPARK-4924 which went to 1.4.0 FYI On Mon, Aug 10, 2015 at 5:12 AM, mark manwoodv...@googlemail.com wrote: Hi All I need to be able to create, submit and report on Spark jobs programmatically in response to events arriving on a Kafka bus. I also need end-users to be able to create data queries that launch Spark jobs 'behind the scenes'. I would expect to use the same API for both, and be able to provide a user friendly view (ie. *not *the Spark web UI) of all jobs (user and system) that are currently running, have completed, failed etc. Are there any tools / add-ons for this? Or is there a suggested approach? Thanks
Re: stopping spark stream app
In general, it is a little risky to put long running stuff in a shutdown hook as it may delay shutdown of the process which may delay other things. That said, you could try it out. A better way to explicitly shutdown gracefully is to use an RPC to signal the driver process to start shutting down, and then the process will gracefully stop the context and terminate. This is more robust that than leveraging shutdown hooks. On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Any help in best recommendation for gracefully shutting down a spark stream application ? I am running it on yarn and a way to tell from externally either yarn application -kill command or some other way but need current batch to be processed completely and checkpoint to be saved before shutting down. Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn kills the application immediately and dooes not call shutdown hook call back . On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How to ensure in spark streaming 1.3 with kafka that when an application is killed , last running batch is fully processed and offsets are written to checkpointing dir. On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark stream 1.3 and using custom checkpoint to save kafka offsets. 1.Is doing Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { jssc.stop(true, true); System.out.println(Inside Add Shutdown Hook); } }); to handle stop is safe ? 2.And I need to handle saving checkoinnt in shutdown hook also or driver will handle it automatically since it grcaefully stops stream and handle completion of foreachRDD function on stream ? directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() { } Thanks
Re: Graceful shutdown for Spark Streaming
Note that this is true only from Spark 1.4 where the shutdown hooks were added. On Mon, Aug 10, 2015 at 12:12 PM, Michal Čizmazia mici...@gmail.com wrote: From logs, it seems that Spark Streaming does handle *kill -SIGINT* with graceful shutdown. Please could you confirm? Thanks! On 30 July 2015 at 08:19, anshu shukla anshushuk...@gmail.com wrote: Yes I was doing same , if You mean that this is the correct way to do Then I will verify it once more in my case . On Thu, Jul 30, 2015 at 1:02 PM, Tathagata Das t...@databricks.com wrote: How is sleep not working? Are you doing streamingContext.start() Thread.sleep(xxx) streamingContext.stop() On Wed, Jul 29, 2015 at 6:55 PM, anshu shukla anshushuk...@gmail.com wrote: If we want to stop the application after fix-time period , how it will work . (How to give the duration in logic , in my case sleep(t.s.) is not working .) So i used to kill coarseGrained job at each slave by script .Please suggest something . On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das t...@databricks.com wrote: StreamingContext.stop(stopGracefully = true) stops the streaming context gracefully. Then you can safely terminate the Spark cluster. They are two different steps and needs to be done separately ensuring that the driver process has been completely terminated before the Spark cluster is the terminated. On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia mici...@gmail.com wrote: How to initiate graceful shutdown from outside of the Spark Streaming driver process? Both for the local and cluster mode of Spark Standalone as well as EMR. Does sbin/stop-all.sh stop the context gracefully? How is it done? Is there a signal sent to the driver process? For EMR, is there a way how to terminate an EMR cluster with Spark Streaming graceful shutdown? Thanks! -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: stopping spark stream app
By RPC you mean web service exposed on driver which listens and set some flag and driver checks that flag at end of each batch and if set then gracefully stop the application ? On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das t...@databricks.com wrote: In general, it is a little risky to put long running stuff in a shutdown hook as it may delay shutdown of the process which may delay other things. That said, you could try it out. A better way to explicitly shutdown gracefully is to use an RPC to signal the driver process to start shutting down, and then the process will gracefully stop the context and terminate. This is more robust that than leveraging shutdown hooks. On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Any help in best recommendation for gracefully shutting down a spark stream application ? I am running it on yarn and a way to tell from externally either yarn application -kill command or some other way but need current batch to be processed completely and checkpoint to be saved before shutting down. Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn kills the application immediately and dooes not call shutdown hook call back . On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How to ensure in spark streaming 1.3 with kafka that when an application is killed , last running batch is fully processed and offsets are written to checkpointing dir. On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark stream 1.3 and using custom checkpoint to save kafka offsets. 1.Is doing Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { jssc.stop(true, true); System.out.println(Inside Add Shutdown Hook); } }); to handle stop is safe ? 2.And I need to handle saving checkoinnt in shutdown hook also or driver will handle it automatically since it grcaefully stops stream and handle completion of foreachRDD function on stream ? directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() { } Thanks
Re: stopping spark stream app
1. RPC can be done in many ways, and a web service is one of many ways. A even more hacky version can be the app polling a file in a file system, if the file exists start shutting down. 2. No need to set a flag. When you get the signal from RPC, you can just call context.stop(stopGracefully = true) . Though note that this is blocking, so gotta be carefully about doing blocking calls on the RPC thread. On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora shushantaror...@gmail.com wrote: By RPC you mean web service exposed on driver which listens and set some flag and driver checks that flag at end of each batch and if set then gracefully stop the application ? On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das t...@databricks.com wrote: In general, it is a little risky to put long running stuff in a shutdown hook as it may delay shutdown of the process which may delay other things. That said, you could try it out. A better way to explicitly shutdown gracefully is to use an RPC to signal the driver process to start shutting down, and then the process will gracefully stop the context and terminate. This is more robust that than leveraging shutdown hooks. On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Any help in best recommendation for gracefully shutting down a spark stream application ? I am running it on yarn and a way to tell from externally either yarn application -kill command or some other way but need current batch to be processed completely and checkpoint to be saved before shutting down. Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn kills the application immediately and dooes not call shutdown hook call back . On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How to ensure in spark streaming 1.3 with kafka that when an application is killed , last running batch is fully processed and offsets are written to checkpointing dir. On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark stream 1.3 and using custom checkpoint to save kafka offsets. 1.Is doing Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { jssc.stop(true, true); System.out.println(Inside Add Shutdown Hook); } }); to handle stop is safe ? 2.And I need to handle saving checkoinnt in shutdown hook also or driver will handle it automatically since it grcaefully stops stream and handle completion of foreachRDD function on stream ? directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() { } Thanks
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
No, it's not like a given KafkaRDD object contains an array of messages that gets serialized with the object. Its compute method generates an iterator of messages as needed, by connecting to kafka. I don't know what was so hefty in your checkpoint directory, because you deleted it. My checkpoint directories are usually pretty reasonable in size. How many topicpartitions did you have, and how long was your window? On Mon, Aug 10, 2015 at 3:33 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Well, RDDs also contain data, don't they? The question is, what can be so hefty in the checkpointing directory to cause Spark driver to run out of memory? It seems that it makes checkpointing expensive, in terms of I/O and memory consumption. Two network hops -- to driver, then to workers. Hefty file system usage, hefty memory consumption... What can we do to offset some of these costs? On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger c...@koeninger.org wrote: The rdd is indeed defined by mostly just the offsets / topic partitions. On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: You need to keep a certain number of rdds around for checkpointing -- that seems like a hefty expense to pay in order to achieve fault tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be sufficient to just persist the offsets, to know where to resume from? Thanks. On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org wrote: You need to keep a certain number of rdds around for checkpointing, based on e.g. the window size. Those would all need to be loaded at once. On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote: I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime(). freeMemory(). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.init(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
Re: Is there any external dependencies for lag() and lead() when using data frames?
By the way, if Hive is present in the Spark install, does show up in text when you start the spark shell? Any commands I can run to check if it exists? I didn't setup the spark machine that I use, so I don't know what's present or absent. Thanks, Jerry On Mon, Aug 10, 2015 at 2:38 PM, Jerry jerry.c...@gmail.com wrote: Thanks... looks like I now hit that bug about HiveMetaStoreClient as I now get the message about being unable to instantiate it. On a side note, does anyone know where hive-site.xml is typically located? Thanks, Jerry On Mon, Aug 10, 2015 at 2:03 PM, Michael Armbrust mich...@databricks.com wrote: You will need to use a HiveContext for window functions to work. On Mon, Aug 10, 2015 at 1:26 PM, Jerry jerry.c...@gmail.com wrote: Hello, Using Apache Spark 1.4.1 I'm unable to use lag or lead when making queries to a data frame and I'm trying to figure out if I just have a bad setup or if this is a bug. As for the exceptions I get: when using selectExpr() with a string as an argument, I get NoSuchElementException: key not found: lag and when using the select method and ...spark.sql.functions.lag I get an AnalysisException. If I replace lag with abs in the first case, Spark runs without exception, so none of the other syntax is incorrect. As for how I'm running it; the code is written in Java with a static method that takes the SparkContext as an argument which is used to create a JavaSparkContext which then is used to create an SQLContext which loads a json file from the local disk and runs those queries on that data frame object. FYI: the java code is compiled, jared and then pointed to with -cp when starting the spark shell, so all I do is Test.run(sc) in shell. Let me know what to look for to debug this problem. I'm not sure where to look to solve this problem. Thanks, Jerry
Re: Spark Kinesis Checkpointing/Processing Delay
You are correct. The earlier Kinesis receiver (as of Spark 1.4) was not saving checkpoints correctly and was in general not reliable (even with WAL enabled). We have improved this in Spark 1.5 with updated Kinesis receiver, that keeps track of the Kinesis sequence numbers as part of the Spark Streaming's DStream checkpointing, and the KCL checkpoint is updated only after the sequence number has been written to the DStream checkpoints. This allows a recovered streaming program (that is, restart from checkpoint) to recover the sequence numbers from the checkpoint information and reprocessed the corresponding records (those which had not been successfully processed). This will give better guarantees. If you are interested to learn more, see the JIRA: https://issues.apache.org/jira/browse/SPARK-9215 Related to this, for your scenarios, you should be setting rate limits (spark.streaming.rateLimit) to prevent spark from receiving data faster that it can process. On Mon, Aug 10, 2015 at 4:40 PM, Phil Kallos phil.kal...@gmail.com wrote: Hi! Sorry if this is a repost. I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil
Re: Package Release Annoucement: Spark SQL on HBase Astro
Yan / Bing: Mind taking a look at HBASE-14181 https://issues.apache.org/jira/browse/HBASE-14181 'Add Spark DataFrame DataSource to HBase-Spark Module' ? Thanks On Wed, Jul 22, 2015 at 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote: We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed “Astro”, include: · Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique · HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing · SQL, Data Frame support · More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) · Joins with data from other sources · Python/Java/Scala support · Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the “Issues” “Pull Requests” links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. “Astro”, the Spark SQL on HBase package will be useful for ultra low latency* query and analytics of large scale data sets in vertical enterprises**.* We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
Re: collect() works, take() returns ImportError: No module named iter
We did have 2.7 on the driver, 2.6 on the edge nodes and figured that was the issue, so we've tried many combinations since then with all three of 2.6.6, 2.7.5, and Anaconda's 2.7.10 on each node with different PATHs and PYTHONPATHs each time. Every combination has produced the same error. We came across a comment on the User board saying Since you're using YARN, you may also need to set SPARK_YARN_USER_ENV to PYSPARK_PYTHON=/your/desired/python/on/slave/nodes. ... I couldn't find SPARK_YARN_USER_ENV in the Spark 1.3 docs but we tried that as well and couldn't get it working. We're open to trying or re-trying any other ideas. On Mon, Aug 10, 2015 at 6:25 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: There is was a similar problem reported before on this list. Weird python errors like this generally mean you have different versions of python in the nodes of your cluster. Can you check that? From error stack you use 2.7.10 |Anaconda 2.3.0 while OS/CDH version of Python is probably 2.6. -- Ruslan Dautkhanov On Mon, Aug 10, 2015 at 3:53 PM, YaoPau jonrgr...@gmail.com wrote: I'm running Spark 1.3 on CDH 5.4.4, and trying to set up Spark to run via iPython Notebook. I'm getting collect() to work just fine, but take() errors. (I'm having issues with collect() on other datasets ... but take() seems to break every time I run it.) My code is below. Any thoughts? sc pyspark.context.SparkContext at 0x7ffbfa310f10 sys.version '2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]' hourly = sc.textFile('tester') hourly.collect() [u'a man', u'a plan', u'a canal', u'panama'] hourly = sc.textFile('tester') hourly.take(2) --- Py4JJavaError Traceback (most recent call last) ipython-input-15-1feecba5868b in module() 1 hourly = sc.textFile('tester') 2 hourly.take(2) /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in take(self, num) 1223 1224 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - 1225 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1226 1227 items += res /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 841 # SparkContext#runJob. 842 mappedRDD = rdd.mapPartitions(partitionFunc) -- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 844 return list(mappedRDD._collect_iterator_through_file(it)) 845 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 47, dhd490101.autotrader.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py, line 101, in main process() File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py, line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/serializers.py, line 236, in dump_stream vs = list(itertools.islice(iterator, batch)) File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py, line 1220, in takeUpToNumLeft while taken left: ImportError: No module named iter at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
Re: EC2 cluster doesn't work saveAsTextFile
Following Hadoop conventions, Spark won't overwrite an existing directory. You need to provide a unique output path every time you run the program, or delete or rename the target directory before you run the job. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç
Re: question about spark streaming
Have a look at the various versions of PairDStreamFunctions.updateStateByWindow ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions). It supports updating running state in memory. (You can persist the state to a database/files periodically if you want). Use an in-memory data structure like a hash map with SKU-price key-values. Update the map as needed on each iteration. One of the versions of this function lets you specify a partitioner if you still need to shard keys. Also, I would be flexible about the 1 second batch interval. Is that really a mandatory requirement for this problem? HTH, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 5:24 AM, sequoiadb mailing-list-r...@sequoiadb.com wrote: hi guys, i have a question about spark streaming. There’s an application keep sending transaction records into spark stream with about 50k tps The record represents a sales information including customer id / product id / time / price columns The application is required to monitor the change of price for each product. For example, if the price of a product increases 10% within 3 minutes, it will send an alert to end user. The interval is required to be set every 1 second, window is somewhere between 180 to 300 seconds. The issue is that I have to compare the price of each transaction ( totally about 10k different products ) against the lowest/highest price for the same product in the all past 180 seconds. That means, in every single second, I have to loop through 50k transactions and compare the price of the same product in all 180 seconds. So it seems I have to separate the calculation based on product id, so that each worker only processes a certain list of products. For example, if I can make sure the same product id always go to the same worker agent, it doesn’t need to shuffle data between worker agent for each comparison. Otherwise if it required to compare each transaction with all other RDDs that cross multiple worker agent, I guess it may not be fast enough for the requirement. Is there anyone knows how to specify the worker node for each transaction record based on its product id, in order to avoid massive shuffle operation? If simply making the product id as the key and price as the value, reduceByKeyAndWindow may cause massive shuffle and slow down the whole throughput. Am I correct? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Cassandra Connector issue
HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set(spark.cassandra.connection.host, 10.246.43.15) .setAppName(First Spark App) .setMaster(local) * .s*etJars(Array(myJar)) .set(cassandra.username, username) .set(cassandra.password, password) .forDse *new* SparkContext(conf) } *val* sc = createSparkContext() *val* user=hkonak0 *val** pass=*Winter18 Class.forName(org.postgresql.Driver).newInstance *val* url = jdbc:postgresql://gptester:5432/db_test *val* myRDD27 = *new* JdbcRDD( sc, ()= DriverManager.getConnection(url,user,pass),select * from wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?,5,0,1,(r: ResultSet) = {(r.getInt( alarm_type_code),r.getString(language_code),r.getString( alrm_type_cd_desc))}) myRDD27.saveToCassandra(keyspace,arm_typ_txt,SomeColumns( alarm_type_code,language_code,alrm_type_cd_desc)) println(myRDD27.count()) println(myRDD27.first) sc.stop() sys.exit() } } *POM XML:* dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.2/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactId*hadoop*-client/artifactId version1.2.1/version /dependency dependency groupIdorg.scala-*lang*/groupId artifactId*scala*-library/artifactId version2.10.5/version /dependency dependency groupId*junit*/groupId artifactId*junit*/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdcom.datastax.dse/groupId artifactId*dse*/artifactId version4.7.2/version scopesystem/scope systemPathC:\workspace\*etl*\*lib*\dse.jar/ systemPath /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-*cassandra*-connector-java_2.10/artifactId version1.1.1/version /dependency /dependencies Please let me know if any further details required to analyze the issue Regards, Satish Chandra
Re: Spark Cassandra Connector issue
Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set( spark.cassandra.connection.host, 10.246.43.15) .setAppName(First Spark App) .setMaster(local) * .s*etJars(Array(myJar)) .set(cassandra.username, username) .set(cassandra.password, password) .forDse *new* SparkContext(conf) } *val* sc = createSparkContext() *val* user=hkonak0 *val** pass=*Winter18 Class.forName(org.postgresql.Driver).newInstance *val* url = jdbc:postgresql://gptester:5432/db_test *val* myRDD27 = *new* JdbcRDD( sc, ()= DriverManager.getConnection(url,user,pass),select * from wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?,5,0,1,(r: ResultSet) = {(r.getInt(alarm_type_code),r.getString(language_code),r.getString( alrm_type_cd_desc))}) myRDD27.saveToCassandra(keyspace,arm_typ_txt,SomeColumns( alarm_type_code,language_code,alrm_type_cd_desc)) println(myRDD27.count()) println(myRDD27.first) sc.stop() sys.exit() } } *POM XML:* dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.2/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactId*hadoop*-client/artifactId version1.2.1/version /dependency dependency groupIdorg.scala-*lang*/groupId artifactId*scala*-library/artifactId version2.10.5/version /dependency dependency groupId*junit*/groupId artifactId*junit*/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdcom.datastax.dse/groupId artifactId*dse*/artifactId version4.7.2/version scopesystem/scope systemPathC:\workspace\*etl*\*lib*\dse.jar/ systemPath /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-*cassandra*-connector-java_2.10/ artifactId version1.1.1/version /dependency /dependencies Please let me know if any further details required to analyze the issue Regards, Satish Chandra
spark-kafka directAPI vs receivers based API
Hi All, I just wanted to know how does directAPI for spark streaming compare with earlier receivers based API. Has anyone used directAPI based approach on production or is it still being used for pocs? Also, since I'm new to spark, could anyone share a starting point from where I could find a working code for both of the above APIs? Also, in my use case I want to analyse a data stream(comma separated string) aggregate over certain fields based on their types. Ideally I would like to push that aggregated data to a column family based datastore(like HBase, we are using it currently). But my first I'd like to find out how to aggregate that data and how does streaming work, whether It polls fetches data in batches or does it continuously listen to the kafka queue for any new message. And how can I configure my application for either cases. I hope my questions make sense. Regards Mohit
Re: Spark Streaming Restart at scheduled intervals
org.apache.spark.streaming.twitter.TwitterInputDStream is a small class. You could write your own that lets you change the filters at run time. Then provide a mechanism in your app, like periodic polling of a database table or file for the list of filters. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 3:56 AM, Pankaj Narang pankajnaran...@gmail.com wrote: Hi All, I am creating spark twitter streaming connection in my app over long period of time. When I have some new keywords I need to add them to the spark streaming connection. I need to stop and start the current twitter streaming connection in this case. I have tried akka actor scheduling but could not achieve the same. Have anybody have idea how to do that ? Regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Restart-at-scheduled-intervals-tp24192.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EC2 cluster doesn't work saveAsTextFile
Thanx Dean, i am giving unique output path and in every time i also delete the directory before i run the job. 2015-08-10 15:30 GMT+03:00 Dean Wampler deanwamp...@gmail.com: Following Hadoop conventions, Spark won't overwrite an existing directory. You need to provide a unique output path every time you run the program, or delete or rename the target directory before you run the job. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç -- hiç ender hiç
Re: EC2 cluster doesn't work saveAsTextFile
So, just before running the job, if you run the HDFS command at a shell prompt: hdfs dfs -ls hdfs://172.31.42.10:54310/./weblogReadResult. Does it say the path doesn't exist? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:58 AM, Yasemin Kaya godo...@gmail.com wrote: Thanx Dean, i am giving unique output path and in every time i also delete the directory before i run the job. 2015-08-10 15:30 GMT+03:00 Dean Wampler deanwamp...@gmail.com: Following Hadoop conventions, Spark won't overwrite an existing directory. You need to provide a unique output path every time you run the program, or delete or rename the target directory before you run the job. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç -- hiç ender hiç
Re: multiple dependency jars using pyspark
Easiest way should be to add both jars in SPARK_CLASSPATH as a colon separated string. On 10 Aug 2015 06:20, Jonathan Haddad j...@jonhaddad.com wrote: I'm trying to write a simple job for Pyspark 1.4 migrating data from MySQL to Cassandra. I can work with either the MySQL JDBC jar or the cassandra jar separately without issue, but when I try to reference both of them it throws an exception: Py4JJavaError: An error occurred while calling o32.save. : java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; I'm not sure if I'm including the jars correctly as --jars says it's comma separated and --driver-class-path seems to take a colon delimited classpath. If I separate the list in --driver-class-path with a comma, i get a class not found exception so I'm thinking colon is right. The job, params for submission, and exception are here. Help getting this going would be deeply appreciated. https://gist.github.com/rustyrazorblade/9a38a9499a7531eefd1e
Spark Streaming Restart at scheduled intervals
Hi All, I am creating spark twitter streaming connection in my app over long period of time. When I have some new keywords I need to add them to the spark streaming connection. I need to stop and start the current twitter streaming connection in this case. I have tried akka actor scheduling but could not achieve the same. Have anybody have idea how to do that ? Regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Restart-at-scheduled-intervals-tp24192.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Possible issue for Spark SQL/DataFrame
Isnt it a space separated data? It is not a comma(,) separated nor pipe (|) separated data. Thanks Best Regards On Mon, Aug 10, 2015 at 12:06 PM, Netwaver wanglong_...@163.com wrote: Hi Spark experts, I am now using Spark 1.4.1 and trying Spark SQL/DataFrame API with text file in below format id gender height 1 M 180 2 F 167 ... ... But I meet issues as described below: 1. In my test program, I specify the schema programmatically, but when I use | as the separator in schema string, the code run into below exception when being executed on the cluster(Standalone) When I use , as the separator, everything works fine. 2. In the code, when I use DataFrame.agg() function with same column name is used for different statistics functions(max,min,avg) val peopleDF = sqlCtx.createDataFrame(rowRDD, schema) peopleDF.filter(peopleDF(gender).equalTo(M )).agg(Map(height - avg,height - max,height - min)).show() I just find only the last function's computation result is shown(as below), Does this work as design in Spark? Hopefully I have described the issue clearly, and please feel free to correct me if have done something wrong, thanks a lot.
Re: SparkR -Graphx Connected components
Thanks for the response Robin is this the same for both Directed and undirected graphs ? val edges = Array(0L - 1L, 1L - 2L, 2L - 0L) ++ Array(3L - 4L, 4L - 5L, 5L - 3L) ++ Array(6L - 0L, 5L - 7L) val rawEdges = sc.parallelize(edges) val graph = Graph.fromEdgeTuples(rawEdges, -1) val sccGraph = graph.stronglyConnectedComponents(20) for ((id, scc) - sccGraph.vertices.collect()) { println(id,scc) } The answer it gave is .Any thoughts here please . (4,3) -OK (0,0)-OK (1,0)-OK (6,6)-This shd have been 6,0 (3,3)-OK (7,7)-This shd have been 7,3 (5,3)-OK (2,0)-OK -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-Connected-components-tp24165p24190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Possible issue for Spark SQL/DataFrame
Hi Spark experts, I am now using Spark 1.4.1 and trying Spark SQL/DataFrame API with text file in below format id gender height 1 M 180 2 F 167 ... ... But I meet issues as described below: 1. In my test program, I specify the schema programmatically, but when I use | as the separator in schema string, the code run into below exception when being executed on the cluster(Standalone) When I use , as the separator, everything works fine. 2. In the code, when I use DataFrame.agg() function with same column name is used for different statistics functions(max,min,avg) valpeopleDF = sqlCtx.createDataFrame(rowRDD, schema) peopleDF.filter(peopleDF(gender).equalTo(M)).agg(Map(height - avg,height - max,height - min)).show() I just find only the last function's computation result is shown(as below), Does this work as design in Spark? Hopefully I have described the issue clearly, and please feel free to correct me if have done something wrong, thanks a lot.
SparkR -Graphx Cliques
How to find the cliques using spark graphx ? a quick code snippet is appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-Cliques-tp24191.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on YARN
Hi, I have looked at the UI scheduler tab and it appears my new user was allocated less cores than my other user, is there any way i can avoid this happening? Thanks, Jem On Sat, Aug 8, 2015 at 8:32 PM Shushant Arora shushantaror...@gmail.com wrote: which is the scheduler on your cluster. Just check on RM UI scheduler tab and see your user and max limit of vcores for that user , is currently other applications of that user have occupies till max vcores of this user then that could be the reason of not allocating vcores to this user but for some other user same applicatin is getting run since another user's max vcore limit is not reached. On Sat, Aug 8, 2015 at 10:07 PM, Jem Tucker jem.tuc...@gmail.com wrote: Hi dustin, Yes there are enough resources available, the same application run with a different user works fine so I think it is something to do with permissions but I can't work out where. Thanks , Jem On Sat, 8 Aug 2015 at 17:35, Dustin Cote dc...@cloudera.com wrote: Hi Jem, In the top of the RM web UI, do you see any available resources to spawn the application master container? On Sat, Aug 8, 2015 at 4:37 AM, Jem Tucker jem.tuc...@gmail.com wrote: Hi Sandy, The application doesn't fail, it gets accepted by yarn but the application master never starts and the application state never changes to running. I have checked in the resource manager and node manager logs and nothing jumps out. Thanks Jem On Sat, 8 Aug 2015 at 09:20, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Jem, Do they fail with any particular exception? Does YARN just never end up giving them resources? Does an application master start? If so, what are in its logs? If not, anything suspicious in the YARN ResourceManager logs? -Sandy On Fri, Aug 7, 2015 at 1:48 AM, Jem Tucker jem.tuc...@gmail.com wrote: Hi, I am running spark on YARN on the CDH5.3.2 stack. I have created a new user to own and run a testing environment, however when using this user applications I submit to yarn never begin to run, even if they are the exact same application that is successful with another user? Has anyone seen anything like this before? Thanks, Jem -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout. -- Dustin Cote Customer Operations Engineer http://www.cloudera.com -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout. -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout . -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout.
Spark with GCS Connector - Rate limit error
Hi, I'm using Spark on a Google Compute Engine cluster with the Google Cloud Storage connector (instead of HDFS, as recommended here https://cloud.google.com/hadoop/google-cloud-storage-connector#benefits ), and get a lot of rate limit errors, as added below. The errors relate to temp files (in a folder called _temporary), and not to my input/output of the program. Is there a way to control the read/write rate of Spark? Is there a way to increase the rate limit for my Google Project? Is there a way to use local Hard-Disk for temp files that don't have to be shared with other slaves? Or anyone knows or thinks of any other solution for that? Thanks, Oren java.io.IOException: Error inserting: bucket: *, object: * at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.wrapException(GoogleCloudStorageImpl.java:1600) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl$3.run(GoogleCloudStorageImpl.java:475) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests { code : 429, errors : [ { domain : usageLimits, message : The total number of changes to the object * exceeds the rate limit. Please reduce the rate of create, update, and delete requests., reason : rateLimitExceeded } ], message : The total number of changes to the object * exceeds the rate limit. Please reduce the rate of create, update, and delete requests. } at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145) at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl$3.run(GoogleCloudStorageImpl.java:472) ... 3 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-GCS-Connector-Rate-limit-error-tp24194.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Kinesis records are merged with out obvious way of separating them
I am using spark 1.4.1 with connector 1.4.0 When I post events slowly and the are being picked one by one everything runs smoothly but when the stream starts delivering batched records there is no obvious way to separate them. Am i missing something? How do I separate the records when they are just a varying length of byte array? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kinesis-records-are-merged-with-out-obvious-way-of-separating-them-tp24193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
question about spark streaming
hi guys, i have a question about spark streaming. There’s an application keep sending transaction records into spark stream with about 50k tps The record represents a sales information including customer id / product id / time / price columns The application is required to monitor the change of price for each product. For example, if the price of a product increases 10% within 3 minutes, it will send an alert to end user. The interval is required to be set every 1 second, window is somewhere between 180 to 300 seconds. The issue is that I have to compare the price of each transaction ( totally about 10k different products ) against the lowest/highest price for the same product in the all past 180 seconds. That means, in every single second, I have to loop through 50k transactions and compare the price of the same product in all 180 seconds. So it seems I have to separate the calculation based on product id, so that each worker only processes a certain list of products. For example, if I can make sure the same product id always go to the same worker agent, it doesn’t need to shuffle data between worker agent for each comparison. Otherwise if it required to compare each transaction with all other RDDs that cross multiple worker agent, I guess it may not be fast enough for the requirement. Is there anyone knows how to specify the worker node for each transaction record based on its product id, in order to avoid massive shuffle operation? If simply making the product id as the key and price as the value, reduceByKeyAndWindow may cause massive shuffle and slow down the whole throughput. Am I correct? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Differents of loading data
What is the differents of loading data using jdbc and loading data using spard data source api? or differents of loading data using mongo-hadoop and loading data using native java driver? Which way is better?
Re: Spark Cassandra Connector issue
I don't know if DSE changed spark-submit, but you have to use a comma-separated list of jars to --jars. It probably looked for HelloWorld in the second one, the dse.jar file. Do this: dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0. 1-SNAPSHOT.jar I also removed the extra //. Or put file: in front of them so they are proper URLs. Note the snapshot jar isn't in the --jars list. I assume that's where HelloWorld is found. Confusing, yes it is... dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 8:23 AM, satish chandra j jsatishchan...@gmail.com wrote: Hi, Thanks for quick input, now I am getting class not found error *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* java.lang.ClassNotFoundException: HelloWorld at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Previously I could fix the issue by changing the order of arguments passing in DSE command line interface but now I am not sure why the issue again Please let me know if still I am missing anything in my Command as mentioned above(as insisted I have added dse.jar and spark-cassandra-connector-java_2.10.1.1.1.jar) Thanks for support Satish Chandra On Mon, Aug 10, 2015 at 6:19 PM, Dean Wampler deanwamp...@gmail.com wrote: Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set(
Re: spark-kafka directAPI vs receivers based API
For direct stream questions: https://github.com/koeninger/kafka-exactly-once Yes, it is used in production. For general spark streaming question: http://spark.apache.org/docs/latest/streaming-programming-guide.html On Mon, Aug 10, 2015 at 7:51 AM, Mohit Durgapal durgapalmo...@gmail.com wrote: Hi All, I just wanted to know how does directAPI for spark streaming compare with earlier receivers based API. Has anyone used directAPI based approach on production or is it still being used for pocs? Also, since I'm new to spark, could anyone share a starting point from where I could find a working code for both of the above APIs? Also, in my use case I want to analyse a data stream(comma separated string) aggregate over certain fields based on their types. Ideally I would like to push that aggregated data to a column family based datastore(like HBase, we are using it currently). But my first I'd like to find out how to aggregate that data and how does streaming work, whether It polls fetches data in batches or does it continuously listen to the kafka queue for any new message. And how can I configure my application for either cases. I hope my questions make sense. Regards Mohit
Re: Spark Cassandra Connector issue
Hi, Thanks for quick input, now I am getting class not found error *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* java.lang.ClassNotFoundException: HelloWorld at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Previously I could fix the issue by changing the order of arguments passing in DSE command line interface but now I am not sure why the issue again Please let me know if still I am missing anything in my Command as mentioned above(as insisted I have added dse.jar and spark-cassandra-connector-java_2.10.1.1.1.jar) Thanks for support Satish Chandra On Mon, Aug 10, 2015 at 6:19 PM, Dean Wampler deanwamp...@gmail.com wrote: Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set( spark.cassandra.connection.host, 10.246.43.15) .setAppName(First Spark App) .setMaster(local) * .s*etJars(Array(myJar)) .set(cassandra.username, username) .set(cassandra.password, password) .forDse *new* SparkContext(conf) } *val* sc = createSparkContext() *val* user=hkonak0 *val** pass=*Winter18 Class.forName(org.postgresql.Driver).newInstance *val* url = jdbc:postgresql://gptester:5432/db_test *val* myRDD27 = *new* JdbcRDD( sc, ()= DriverManager.getConnection(url,user,pass),select * from wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?,5,0,1,(r: ResultSet) = {(r.getInt(alarm_type_code),r.getString(language_code),r.getString( alrm_type_cd_desc))}) myRDD27.saveToCassandra(keyspace,arm_typ_txt,SomeColumns( alarm_type_code,language_code,alrm_type_cd_desc)) println(myRDD27.count()) println(myRDD27.first) sc.stop() sys.exit() } } *POM
Re: How to connect to spark remotely from java
You don't connect to spark exactly. The spark client (running on your remote machine) submits jobs to the YARN cluster running on HDP. What you probably need is yarn-cluster or yarn-client with the yarn client configs as downloaded from the Ambari actions menu. Simon On 10 Aug 2015, at 12:44, Zsombor Egyed egye...@starschema.net wrote: Hi! I want to know how can I connect to hortonworks spark from an other machine. So there is a HDP 2.2 and I want to connect to this, from remotely via java api. Do you have any suggestion? Thanks! Regards, -- Egyed Zsombor Junior Big Data Engineer Mobile: +36 70 320 65 81 | Twitter:@starschemaltd Email: egye...@starschema.net | Web: www.starschema.net
How to connect to spark remotely from java
Hi! I want to know how can I connect to hortonworks spark from an other machine. So there is a HDP 2.2 and I want to connect to this, from remotely via java api. Do you have any suggestion? Thanks! Regards, -- *Egyed Zsombor * Junior Big Data Engineer Mobile: +36 70 320 65 81 | Twitter:@starschemaltd Email: egye...@starschema.net bali...@starschema.net | Web: www.starschema.net