Re: How to convert dataframe to a nested StructType schema
Hao, For spark 1.4.1, you can try this: val rowrdd = df.rdd.map(r => Row(Row(r(3)), Row(r(0), r(1), r(2 val newDF = sqlContext.createDataFrame(rowrdd, yourNewSchema) Thanks! - Terry On Wed, Sep 16, 2015 at 2:10 AM, Hao Wangwrote: > Hi, > > I created a dataframe with 4 string columns (city, state, country, > zipcode). > I then applied the following nested schema to it by creating a custom > StructType. When I run df.take(5), it gives the exception below as > expected. > The question is how I can convert the Rows in the dataframe to conform to > this nested schema? Thanks! > > root > |-- ZipCode: struct (nullable = true) > ||-- zip: string (nullable = true) > |-- Address: struct (nullable = true) > ||-- city: string (nullable = true) > ||-- state: string (nullable = true) > ||-- country: string (nullable = true) > > [info] org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class > java.lang.String) > [info] at > > org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178) > [info] at > > org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348) > [info] at > > org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180) > [info] at > org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) > [info] at > org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier
Sean, Thank you! Finally, I get this to work, although it is a bit ugly: manually to set the meta data of dataframe. import org.apache.spark.ml.attribute._ import org.apache.spark.sql.types._ val df = training.toDF() val schema = df.schema val rowRDD = df.rdd def enrich(m : Metadata) : Metadata = { val na = NominalAttribute.defaultAttr.withValues("0", "1") na.toMetadata(m) } val newSchema = StructType(schema.map(f => if (f.name == "label") f.copy(metadata=enrich(f.metadata)) else f)) val model = pipeline.fit(sqlContext.createDataFrame(rowRDD, newSchema)) Thanks! - Terry On Mon, Sep 7, 2015 at 4:24 PM, Sean Owen <so...@cloudera.com> wrote: > Hm, off the top of my head I don't know. I haven't looked at this > aspect in a while, strangely. It's an attribute in the metadata of the > field. I assume there's a method for setting this metadata when you > construct the input data. > > On Sun, Sep 6, 2015 at 10:41 AM, Terry Hole <hujie.ea...@gmail.com> wrote: > > Sean > > > > Do you know how to tell decision tree that the "label" is a binary or set > > some attributes to dataframe to carry number of classes? > > > > Thanks! > > - Terry > > > > On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote: > >> > >> (Sean) > >> The error suggests that the type is not a binary or nominal attribute > >> though. I think that's the missing step. A double-valued column need > >> not be one of these attribute types. > >> > >> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com> > wrote: > >> > Hi, Owen, > >> > > >> > The dataframe "training" is from a RDD of case class: > >> > RDD[LabeledDocument], > >> > while the case class is defined as this: > >> > case class LabeledDocument(id: Long, text: String, label: Double) > >> > > >> > So there is already has the default "label" column with "double" type. > >> > > >> > I already tried to set the label column for decision tree as this: > >> > val lr = new > >> > > >> > > DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label") > >> > It raised the same error. > >> > > >> > I also tried to change the "label" to "int" type, it also reported > error > >> > like following stack, I have no idea how to make this work. > >> > > >> > java.lang.IllegalArgumentException: requirement failed: Column label > >> > must be > >> > of type DoubleType but was actually IntegerType. > >> > at scala.Predef$.require(Predef.scala:233) > >> > at > >> > > >> > > org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37) > >> > at > >> > > >> > > org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53) > >> > at > >> > > >> > > org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71) > >> > at > >> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116) > >> > at > >> > > >> > > org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) > >> > at > >> > > >> > > org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) > >> > at > >> > > >> > > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) > >> > at > >> > > >> > > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) > >> > at > >> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) > >> > at > >> > org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162) > >> > at > >> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59) > >> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116) > >> > at > >> > > >> > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) > >> > at > >> > > >> > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56) > >> > at > >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iw
Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier
Xiangrui, Do you have any idea how to make this work? Thanks - Terry Terry Hole <hujie.ea...@gmail.com>于2015年9月6日星期日 17:41写道: > Sean > > Do you know how to tell decision tree that the "label" is a binary or set > some attributes to dataframe to carry number of classes? > > Thanks! > - Terry > > On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote: > >> (Sean) >> The error suggests that the type is not a binary or nominal attribute >> though. I think that's the missing step. A double-valued column need >> not be one of these attribute types. >> >> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com> >> wrote: >> > Hi, Owen, >> > >> > The dataframe "training" is from a RDD of case class: >> RDD[LabeledDocument], >> > while the case class is defined as this: >> > case class LabeledDocument(id: Long, text: String, label: Double) >> > >> > So there is already has the default "label" column with "double" type. >> > >> > I already tried to set the label column for decision tree as this: >> > val lr = new >> > >> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label") >> > It raised the same error. >> > >> > I also tried to change the "label" to "int" type, it also reported error >> > like following stack, I have no idea how to make this work. >> > >> > java.lang.IllegalArgumentException: requirement failed: Column label >> must be >> > of type DoubleType but was actually IntegerType. >> > at scala.Predef$.require(Predef.scala:233) >> > at >> > >> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37) >> > at >> > >> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53) >> > at >> > >> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71) >> > at >> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116) >> > at >> > >> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) >> > at >> > >> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) >> > at >> > >> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >> > at >> > >> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) >> > at >> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) >> > at >> org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162) >> > at >> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59) >> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116) >> > at >> > >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) >> > at >> > >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56) >> > at >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58) >> > at >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60) >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62) >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66) >> > at $iwC$$iwC$$iwC$$iwC$$iwC.(:68) >> > at $iwC$$iwC$$iwC$$iwC.(:70) >> > at $iwC$$iwC$$iwC.(:72) >> > at $iwC$$iwC.(:74) >> > at $iwC.(:76) >> > at (:78) >> > at .(:82) >> > at .() >> > at .(:7) >> > at .() >> > at $print() >> > >> > Thanks! >> > - Terry >> > >> > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote: >> >> >> >> I think somewhere alone the line you've not specified your label >> >> column -- it's defaulting to "label" and it does not recognize it, or >> >> at least not as a binary or nominal attribute. >> >> >> >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com> >> wrote: >> >> > Hi, Experts, >> >> > >> >> > I followed the guide of spark ml pipe to test DecisionTr
Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier
Hi, Owen, The dataframe "training" is from a RDD of case class: RDD[LabeledDocument], while the case class is defined as this: case class LabeledDocument(id: Long, text: String, *label: Double*) So there is already has the default "label" column with "double" type. I already tried to set the label column for decision tree as this: val lr = new DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label") It raised the same error. I also tried to change the "label" to "int" type, it also reported error like following stack, I have no idea how to make this work. java.lang.IllegalArgumentException: requirement failed: *Column label must be of type DoubleType but was actually IntegerType*. at scala.Predef$.require(Predef.scala:233) at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37) at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53) at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71) at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116) at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162) at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59) at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66) at $iwC$$iwC$$iwC$$iwC$$iwC.(:68) at $iwC$$iwC$$iwC$$iwC.(:70) at $iwC$$iwC$$iwC.(:72) at $iwC$$iwC.(:74) at $iwC.(:76) at (:78) at .(:82) at .() at .(:7) at .() at $print() Thanks! - Terry On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote: > I think somewhere alone the line you've not specified your label > column -- it's defaulting to "label" and it does not recognize it, or > at least not as a binary or nominal attribute. > > On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com> wrote: > > Hi, Experts, > > > > I followed the guide of spark ml pipe to test DecisionTreeClassifier on > > spark shell with spark 1.4.1, but always meets error like following, do > you > > have any idea how to fix this? > > > > The error stack: > > java.lang.IllegalArgumentException: DecisionTreeClassifier was given > input > > with invalid label column label, without the number of classes specified. > > See StringIndexer. > > at > > > org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:71) > > at > > > org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:41) > > at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) > > at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) > > at > > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:133) > > at > > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:129) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > > > scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:42) > > at > > > scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:43) > > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:129) > > at > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:42) > > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53) > > at $iwC$$iwC$$iwC$$iwC$$iwC.(:55) > > at $iwC$$iwC$$iwC$$iwC.(:57
Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier
Sean Do you know how to tell decision tree that the "label" is a binary or set some attributes to dataframe to carry number of classes? Thanks! - Terry On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote: > (Sean) > The error suggests that the type is not a binary or nominal attribute > though. I think that's the missing step. A double-valued column need > not be one of these attribute types. > > On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com> wrote: > > Hi, Owen, > > > > The dataframe "training" is from a RDD of case class: > RDD[LabeledDocument], > > while the case class is defined as this: > > case class LabeledDocument(id: Long, text: String, label: Double) > > > > So there is already has the default "label" column with "double" type. > > > > I already tried to set the label column for decision tree as this: > > val lr = new > > > DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label") > > It raised the same error. > > > > I also tried to change the "label" to "int" type, it also reported error > > like following stack, I have no idea how to make this work. > > > > java.lang.IllegalArgumentException: requirement failed: Column label > must be > > of type DoubleType but was actually IntegerType. > > at scala.Predef$.require(Predef.scala:233) > > at > > > org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37) > > at > > > org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53) > > at > > > org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71) > > at > > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116) > > at > > > org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) > > at > > > org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) > > at > > > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) > > at > > > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) > > at > > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) > > at > org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162) > > at > > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59) > > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116) > > at > > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) > > at > > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56) > > at > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58) > > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66) > > at $iwC$$iwC$$iwC$$iwC$$iwC.(:68) > > at $iwC$$iwC$$iwC$$iwC.(:70) > > at $iwC$$iwC$$iwC.(:72) > > at $iwC$$iwC.(:74) > > at $iwC.(:76) > > at (:78) > > at .(:82) > > at .() > > at .(:7) > > at .() > > at $print() > > > > Thanks! > > - Terry > > > > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote: > >> > >> I think somewhere alone the line you've not specified your label > >> column -- it's defaulting to "label" and it does not recognize it, or > >> at least not as a binary or nominal attribute. > >> > >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com> > wrote: > >> > Hi, Experts, > >> > > >> > I followed the guide of spark ml pipe to test DecisionTreeClassifier > on > >> > spark shell with spark 1.4.1, but always meets error like following, > do > >> > you > >> > have any idea how to fix this? > >> > > >> > The error stack: > >> > java.lang.IllegalArgumentException: DecisionTreeClassifier was given > >> > input > >> > with invalid label column label, without the number of classes > >> > specified. > >> > See StringIndexer. > >> > at > >> > > >&g
Re: Job aborted due to stage failure: java.lang.StringIndexOutOfBoundsException: String index out of range: 18
Ricky, You may need to use map instead of flatMap in your case *val rowRDD=sc.textFile(/user/spark/short_model).map(_.split(\\t)).map(p = Row(...))* Thanks! -Terry On Fri, Aug 28, 2015 at 5:08 PM, our...@cnsuning.com our...@cnsuning.com wrote: hi all, when using spark sql ,A problem bothering me. the codeing as following: *val schemaString = visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fromcuxiao_pv,app_four_page_num,app_group_gds_addcart,app_group_gds_collect,app_group_fourpage_pv,app_group_fourpage_time,app_visitor_pv,app_search_pv,app_list_pv,app_is_view1,app_is_view,app_view_cycle_days,app_view_days,gds_score_desc,l4_gds_group_rate_n,decision_cycle_days,decision_days,decision_pv,is_order,statis_date* //*schemaString.length=72 * *import org.apache.spark.sql.Row;* *import org.apache.spark.sql.types.{StructType,StructField,StringType};* *val schema =StructType( schemaString.split(,).map(fieldName = StructField(fieldName, StringType, true)))* *val rowRDD=sc.textFile(/user/spark/short_model).flatMap(_.split(\\t)).map(p = Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71)))* *val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)* *peopleDataFrame.registerTempTable(alg)* *val results = sqlContext.sql(SELECT count(*) FROM alg)* *results.collect()* the error log as following: 5/08/28 17:00:54 WARN TaskSetManager: Lost task 56.0 in stage 9.0 (TID 71, 10.104.74.8): java.lang.StringIndexOutOfBoundsException: String index out of range: 18 at java.lang.String.charAt(String.java:658) at scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39) at $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:26) at $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:26) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.1 in stage 9.0 (TID 72, 10.104.74.8, NODE_LOCAL, 1415 bytes) 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.1 in stage 9.0 (TID 72) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException (String index out
Re: standalone to connect mysql
Jack, You can refer the hive sql syntax if you use HiveContext: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML Thanks! -Terry That works! Thanks. Can I ask you one further question? How did spark sql support insertion? That is say, if I did: sqlContext.sql(insert into newStu values (“10”,”a”,1) the error is: failure: ``table'' expected but identifier newStu found insert into newStu values ('10', aa, 1) but if I did: sqlContext.sql(sinsert into Table newStu select * from otherStu) that works. Is there any document addressing that? Best regards, Jack *From:* Terry Hole [mailto:hujie.ea...@gmail.com] *Sent:* Tuesday, 21 July 2015 4:17 PM *To:* Jack Yang; user@spark.apache.org *Subject:* Re: standalone to connect mysql Maybe you can try: spark-submit --class sparkwithscala.SqlApp --jars /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar Thanks! -Terry Hi there, I would like to use spark to access the data in mysql. So firstly I tried to run the program using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar that returns me the correct results. Then I tried the standalone version using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.) and the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No suitable driver found for jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root I also found the similar problem before in https://jira.talendforge.org/browse/TBD-2244. Is this a bug to be fixed later? Or do I miss anything? Best regards, Jack
Re: standalone to connect mysql
Maybe you can try: spark-submit --class sparkwithscala.SqlApp --jars /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar Thanks! -Terry Hi there, I would like to use spark to access the data in mysql. So firstly I tried to run the program using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar that returns me the correct results. Then I tried the standalone version using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.) and the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No suitable driver found for jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root I also found the similar problem before in https://jira.talendforge.org/browse/TBD-2244. Is this a bug to be fixed later? Or do I miss anything? Best regards, Jack
Re: [Spark Shell] Could the spark shell be reset to the original status?
Hi Ted, Thanks for the information. The post seems little different with my requirement: suppose we defined different functions to do different streaming work (e.g. 50 functions), i want to test these 50 functions in the spark shell, and the shell will always throw OOM at the middle of test (yes, it could be solved by increasing the jvm memory size, but if we have more functions, the issue still will happen). The main issue is that the shell keeps track all the information (class, objects...) from started, so the java memory will increase time to time when define/invoke the functions. Thanks! - Terry Ted Yu yuzhih...@gmail.com于2015年7月17日周五 下午12:02写道: See this recent thread: http://search-hadoop.com/m/q3RTtFW7iMDkrj61/Spark+shell+oom+subj=java+lang+OutOfMemoryError+PermGen+space On Jul 16, 2015, at 8:51 PM, Terry Hole hujie.ea...@gmail.com wrote: Hi, Background: The spark shell will get out of memory error after dealing lots of spark work. Is there any method which can reset the spark shell to the startup status? I tried *:reset*, but it seems not working: i can not create spark context anymore (some compile error as below) after the *:reset*. (I have to restart the shell after OOM to workaround) == Expanded type of tree == TypeRef(TypeSymbol(class $read extends Serializable)) uncaught exception during compilation: java.lang.AssertionError java.lang.AssertionError: assertion failed: Tried to find '$line16' in 'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63' but it is not a directory That entry seems to have slain the compiler. Shall I replayyour session? I can re-run each line except the last one.[y/n] Abandoning crashed session. Thanks! -Terry
[Spark Shell] Could the spark shell be reset to the original status?
Hi, Background: The spark shell will get out of memory error after dealing lots of spark work. Is there any method which can reset the spark shell to the startup status? I tried *:reset*, but it seems not working: i can not create spark context anymore (some compile error as below) after the *:reset*. (I have to restart the shell after OOM to workaround) == Expanded type of tree == TypeRef(TypeSymbol(class $read extends Serializable)) uncaught exception during compilation: java.lang.AssertionError java.lang.AssertionError: assertion failed: Tried to find '$line16' in 'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63' but it is not a directory That entry seems to have slain the compiler. Shall I replayyour session? I can re-run each line except the last one.[y/n] Abandoning crashed session. Thanks! -Terry
Re: fileStream with old files
Hi, Hunter, *What **behavior do you see with the HDFS? The local file system and HDFS should have the same ** behavior.* *Thanks!* *- Terry* Hunter Morgan hunter.mor...@rackspace.com于2015年7月16日周四 上午2:04写道: After moving the setting of the parameter to SparkConf initialization instead of after the context is already initialized, I have it operating reliably on local filesystem, but not on hdfs. Are there any differences in behavior between these two cases I should be aware of? I don’t usually mailinglist or exchange, so forgive me for my ignorance of whether this message will go horribly wrong due to formatting. I plan to port the following code to Hadoop FS API to generalize testing to understand actual behavior and ensure desired behavior. public static JavaDStreamString textFileStreamIncludingExisting(JavaStreamingContext context, String path) { return context.fileStream(path, LongWritable .class, Text.class, TextInputFormat.class, v1 - true, false).map(v1 - v1._2.toString()); } @Test public void testTextFileStreamIncludingExistingReadsOldFiles() throws Exception { final Path testDir = Files.createTempDirectory(sparkTest); final ArrayListPath tempFiles = new ArrayList(); // create 20 old files final int testFileNumberLimit = 20; for (int testFileNumber = 0; testFileNumber testFileNumberLimit; testFileNumber++) { final Path testFile = Files.createTempFile(testDir, testFile, ); tempFiles.add(testFile); final FileWriter fileWriter = new FileWriter(testFile.toFile()); fileWriter.write(asdf); fileWriter.flush(); fileWriter.close(); for (String eachAttribute : new String[]{basic:lastAccessTime, basic:lastModifiedTime, basic:creationTime}) { // set file dates 0 to 20 days ago Files.setAttribute(testFile, eachAttribute, FileTime.from( Instant.now().minus(Duration.ofDays (testFileNumber; } } final SparkConf sparkConf = new SparkConf().setMaster(local[1]). setAppName(test); sparkConf.set(spark.streaming.minRememberDuration, String.valueOf( Integer.MAX_VALUE)); final JavaStreamingContext context = new JavaStreamingContext( sparkConf, Durations.seconds(1)); final JavaDStreamString input = SparkUtil. textFileStreamIncludingExisting(context, String.valueOf(testDir .toUri())); // count files read final AccumulatorInteger accumulator = context.sparkContext(). accumulator(0); // setup async wait Semaphore done = new Semaphore(1); done.acquire(); input.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString v1) throws Exception { if (v1.count() == 0) { done.release(); } accumulator.add((int) v1.count()); return null; } }); context.start(); // wait for completion or 20 sec done.tryAcquire(20, TimeUnit.SECONDS); context.stop(); assertThat(accumulator.value(), is(testFileNumberLimit)); for (Path eachTempFile : tempFiles) { Files.deleteIfExists(eachTempFile); } Files.deleteIfExists(testDir); } *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Wednesday, July 15, 2015 00:01 *To:* Terry Hole *Cc:* Hunter Morgan; user@spark.apache.org *Subject:* Re: fileStream with old files It was added, but its not documented publicly. I am planning to change the name of the conf to spark.streaming.fileStream.minRememberDuration to make it easier to understand On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole hujie.ea...@gmail.com wrote: A new configuration named *spark.streaming.minRememberDuration* was added since 1.2.1 to control the file stream input, the default value is *60 seconds*, you can change this value to a large value to include older files (older than 1 minute) You can get the detail from this jira: https://issues.apache.org/jira/browse/SPARK-3276 -Terry On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant hunter.mor...@rackspace.com wrote: It's not as odd as it sounds. I want to ensure that long streaming job outages can recover all the files that went into a directory while the job was down. I've looked at http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039 and http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435 and https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e , but all seem unhelpful. I've tested combinations of the following: * fileStreams created with dumb accept-all filters * newFilesOnly true and false, * tweaking minRememberDuration to high and low values, * on hdfs or local
Re: fileStream with old files
A new configuration named *spark.streaming.minRememberDuration* was added since 1.2.1 to control the file stream input, the default value is *60 seconds*, you can change this value to a large value to include older files (older than 1 minute) You can get the detail from this jira: https://issues.apache.org/jira/browse/SPARK-3276 -Terry On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant hunter.mor...@rackspace.com wrote: It's not as odd as it sounds. I want to ensure that long streaming job outages can recover all the files that went into a directory while the job was down. I've looked at http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039 and http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435 and https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e , but all seem unhelpful. I've tested combinations of the following: * fileStreams created with dumb accept-all filters * newFilesOnly true and false, * tweaking minRememberDuration to high and low values, * on hdfs or local directory. The problem is that it will not read files in the directory from more than a minute ago. JavaPairInputDStreamLongWritable, Text input = context.fileStream(indir, LongWritable.class, Text.class, TextInputFormat.class, v - true, false); Also tried with having set: context.sparkContext().getConf().set(spark.streaming.minRememberDuration, 1654564); to big/small. Are there known limitations of the onlyNewFiles=false? Am I doing something wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?
Michael, Thanks - Terry Michael Armbrust mich...@databricks.com于2015年7月11日星期六 04:02写道: Metastore configuration should be set in hive-site.xml. On Thu, Jul 9, 2015 at 8:59 PM, Terry Hole hujie.ea...@gmail.com wrote: Hi, I am trying to set the hive metadata destination to a mysql database in hive context, it works fine in spark 1.3.1, but it seems broken in spark 1.4.1-rc1, where it always connect to the default metadata: local), is this a regression or we must set the connection in hive-site.xml? The code is very simple in spark shell: * import org.apache.spark.sql.hive._* *val hiveContext = new HiveContext(sc)* *hiveContext.setConf(javax.jdo.option.ConnectionDriveName, com.mysql.jdbc.Driver)* *hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionURL, jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)* *hiveContext.setConf(hive.metastore.warehouse.dir, /user/hive/warehouse)* *hiveContext.sql(select * from mysqltable).show()* *Thanks!* *-Terry*
[Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?
Hi, I am trying to set the hive metadata destination to a mysql database in hive context, it works fine in spark 1.3.1, but it seems broken in spark 1.4.1-rc1, where it always connect to the default metadata: local), is this a regression or we must set the connection in hive-site.xml? The code is very simple in spark shell: * import org.apache.spark.sql.hive._* *val hiveContext = new HiveContext(sc)* *hiveContext.setConf(javax.jdo.option.ConnectionDriveName, com.mysql.jdbc.Driver)* *hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionURL, jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)* *hiveContext.setConf(hive.metastore.warehouse.dir, /user/hive/warehouse)* *hiveContext.sql(select * from mysqltable).show()* *Thanks!* *-Terry*
Re: Is there a way to shutdown the derby in hive context in spark shell?
) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:171) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) at org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:116) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:172) at org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:168) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:213) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:176) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:359) On Thu, Jul 9, 2015 at 2:32 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try sc.shutdown and creating a new one? Thanks Best Regards On Wed, Jul 8, 2015 at 8:12 PM, Terry Hole hujie.ea...@gmail.com wrote: I am using spark 1.4.1rc1 with default hive settings Thanks - Terry Hi All, I'd like to use the hive context in spark shell, i need to recreate the hive meta database in the same location, so i want to close the derby connection previous created in the spark shell, is there any way to do this? I try this, but it does not work: DriverManager.getConnection(jdbc:derby:;shutdown=true); Thanks! - Terry
Is there a way to shutdown the derby in hive context in spark shell?
Hi All, I'd like to use the hive context in spark shell, i need to recreate the hive meta database in the same location, so i want to close the derby connection previous created in the spark shell, is there any way to do this? I try this, but it does not work: DriverManager.getConnection(jdbc:derby:;shutdown=true); Thanks! - Terry
Re: Is there a way to shutdown the derby in hive context in spark shell?
I am using spark 1.4.1rc1 with default hive settings Thanks - Terry Hi All, I'd like to use the hive context in spark shell, i need to recreate the hive meta database in the same location, so i want to close the derby connection previous created in the spark shell, is there any way to do this? I try this, but it does not work: DriverManager.getConnection(jdbc:derby:;shutdown=true); Thanks! - Terry
Re: Meets class not found error in spark console with newly hive context
Found this a bug in spark 1.4.0: SPARK-8368 https://issues.apache.org/jira/browse/SPARK-8368 Thanks! Terry On Thu, Jul 2, 2015 at 1:20 PM, Terry Hole hujie.ea...@gmail.com wrote: All, I am using spark console 1.4.0 to do some tests, when a create a newly HiveContext (Line 18 in the code) in my test function, it always throw exception like below (It works in spark console 1.3.0), but if i removed the HiveContext (The line 18 in the code) in my function, it works fine. Any idea what's wrong with this? java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos ureCleaner.scala:455) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.SparkContext.withScope(SparkContext.scala:681) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258) at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(console:98) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:93) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:98) 1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._ 3 import org.apache.spark.streaming.{ StreamingContext, Seconds, Minutes, Time } 4 import org.apache.spark.streaming.StreamingContext._ 5 import org.apache.spark.rdd.RDD 6 import org.apache.spark.streaming.dstream.DStream 7 import org.apache.spark.HashPartitioner 8 import org.apache.spark.storage.StorageLevel 9 import org.apache.spark.sql._10 import org.apache.spark.sql.hive._11 import scala.collection.mutable.{Queue}12 import scala.concurrent.Future13 import scala.concurrent.ExecutionContext.Implicits.global14 15 def streamingTest(args: Array[String]) {16 println( create streamingContext.)17 val ssc = new StreamingContext(sc, Seconds(1))18 *val sqlContext2 = new HiveContext(sc)*19 20 val accum = sc.accumulator(0, End Accumulator)21 val queue = scala.collection.mutable.Queue(sc.textFile(G:/pipe/source))22 val textSource = ssc.queueStream(queue, true)23 textSource.foreachRDD(rdd = { rdd.foreach( item = {accum += 1} ) })24 textSource.foreachRDD(rdd = {25 var sample = rdd.take(10)26 if (sample.length 0) {27 sample.foreach(item = println(#= + item))28 }29 })30 println( Start streaming context.)31 ssc.start()32 val stopFunc = Future {var isRun = true; var duration = 0; while (isRun) { Thread.sleep(1000); duration += 1; if ( accum.value 0 || duration = 120) {println(### STOP SSC ###);ssc.stop(false, true); duration = 0; isRun = false} }}33 ssc.awaitTermination()34 println( Streaming context terminated.)35 }36 37 streamingTest(null)38 Thanks Terry
Meets class not found error in spark console with newly hive context
All, I am using spark console 1.4.0 to do some tests, when a create a newly HiveContext (Line 18 in the code) in my test function, it always throw exception like below (It works in spark console 1.3.0), but if i removed the HiveContext (The line 18 in the code) in my function, it works fine. Any idea what's wrong with this? java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos ureCleaner.scala:455) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.SparkContext.withScope(SparkContext.scala:681) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258) at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(console:98) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:93) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:98) 1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._ 3 import org.apache.spark.streaming.{ StreamingContext, Seconds, Minutes, Time } 4 import org.apache.spark.streaming.StreamingContext._ 5 import org.apache.spark.rdd.RDD 6 import org.apache.spark.streaming.dstream.DStream 7 import org.apache.spark.HashPartitioner 8 import org.apache.spark.storage.StorageLevel 9 import org.apache.spark.sql._10 import org.apache.spark.sql.hive._11 import scala.collection.mutable.{Queue}12 import scala.concurrent.Future13 import scala.concurrent.ExecutionContext.Implicits.global14 15 def streamingTest(args: Array[String]) {16 println( create streamingContext.)17 val ssc = new StreamingContext(sc, Seconds(1))18 *val sqlContext2 = new HiveContext(sc)*19 20 val accum = sc.accumulator(0, End Accumulator)21 val queue = scala.collection.mutable.Queue(sc.textFile(G:/pipe/source))22 val textSource = ssc.queueStream(queue, true)23 textSource.foreachRDD(rdd = { rdd.foreach( item = {accum += 1} ) })24 textSource.foreachRDD(rdd = {25 var sample = rdd.take(10)26 if (sample.length 0) {27 sample.foreach(item = println(#= + item))28 }29 })30 println( Start streaming context.)31 ssc.start()32 val stopFunc = Future {var isRun = true; var duration = 0; while (isRun) { Thread.sleep(1000); duration += 1; if ( accum.value 0 || duration = 120) {println(### STOP SSC ###);ssc.stop(false, true); duration = 0; isRun = false} }}33 ssc.awaitTermination()34 println( Streaming context terminated.)35 }36 37 streamingTest(null)38 Thanks Terry
Re: Is it possible to set the akka specify properties (akka.extensions) in spark
Hi, Akhil, I tried this. It did not work. I also tried SparkConf.set(akka. extensions,[\kamon.system.SystemMetrics\, \kamon.statsd.StatsD\]), it also did not work. Thanks On Mon, May 11, 2015 at 2:56 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try SparkConf.set(spark.akka.extensions,Whatever), underneath i think spark won't ship properties which don't start with spark.* to the executors. Thanks Best Regards On Mon, May 11, 2015 at 8:33 AM, Terry Hole hujie.ea...@gmail.com wrote: Hi all, I'd like to monitor the akka using kamon, which need to set the akka.extension to a list like this in typesafe config format: akka { extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD] } But i can not find a way to do this, i have tried these: 1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics, kamon.statsd.StatsD]) 2. use application.conf and set it use java option -Dconfig.resource=/path/to/conf 3. Set akka.extensions [kamon.system.SystemMetrics, kamon.statsd.StatsD] in spark conf file None of these work. Do we have others ways to set this? Thanks!
Is it possible to set the akka specify properties (akka.extensions) in spark
Hi all, I'd like to monitor the akka using kamon, which need to set the akka.extension to a list like this in typesafe config format: akka { extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD] } But i can not find a way to do this, i have tried these: 1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics, kamon.statsd.StatsD]) 2. use application.conf and set it use java option -Dconfig.resource=/path/to/conf 3. Set akka.extensions [kamon.system.SystemMetrics, kamon.statsd.StatsD] in spark conf file None of these work. Do we have others ways to set this? Thanks!
Is it possible to set the akka specify properties (akka.extensions) in spark
Hi all, I'd like to monitor the akka using kamon, which need to set the akka.extension to a list like this in typesafe config format: akka { extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD] } But i can not find a way to do this, i have tried these: 1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics, kamon.statsd.StatsD]) 2. use application.conf and set it use java option -Dconfig.resource=/path/to/conf 3. Set akka.extensions [kamon.system.SystemMetrics, kamon.statsd.StatsD] in spark conf file None of these two works. Do we have others ways to set this? Thanks!
Is it possible to set the akka specify properties (akka.extensions) in spark
Hi all, I'd like to monitor the akka using kamon, which need to set the akka.extension to a list like this in typesafe config format: akka { extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD] } But i can not find a way to do this, i have tried these: 1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics, kamon.statsd.StatsD]) 2. use application.conf and set it use java option -Dconfig.resource=/path/to/conf 3. Set akka.extensions [kamon.system.SystemMetrics, kamon.statsd.StatsD] in spark conf file None of these two works. Do we have others ways to set this? Thanks!
Re: spark 1.3.0 strange log message
Use this in spark conf: spark.ui.showConsoleProgress=false Best Regards, On Fri, Apr 24, 2015 at 11:23 AM, Henry Hung ythu...@winbond.com wrote: Dear All, When using spark 1.3.0 spark-submit with directing out and err to a log file, I saw some strange lines inside that looks like this: [Stage 0:(0 + 2) / 120] [Stage 0:(2 + 2) / 120] [Stage 0:== (6 + 2) / 120] [Stage 0:= (12 + 2) / 120] [Stage 0:= (20 + 2) / 120] [Stage 0:===(24 + 2) / 120] [Stage 0:== (32 + 2) / 120] [Stage 0:===(42 + 2) / 120] [Stage 0: (52 + 2) / 120] [Stage 0:===(59 + 2) / 120] [Stage 0:===(68 + 2) / 120] [Stage 0: (78 + 3) / 120] [Stage 0:= (88 + 4) / 120] [Stage 0:= (100 + 2) / 120] [Stage 0:==(110 + 2) / 120] Here is my log4j property: # Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO I want to know how to disable this kind of stage progress message? Best regards, Henry -- The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Fwd: [Spark Streaming] The FileInputDStream newFilesOnly=false does not work in 1.2 since
Hi, I am trying to move from 1.1 to 1.2 and found that the newFilesOnly=false (Intend to include old files) does not work anymore. It works great in 1.1, this should be introduced by the last change of this class. Does this flag behavior change or is it a regression? Issue should be caused by this code: From line 157 in FileInputDStream.scala val modTimeIgnoreThreshold = math.max( initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting currentTime - durationToRemember.milliseconds // trailing end of the remember window ) Regards - Terry