Re: How to convert dataframe to a nested StructType schema

2015-09-15 Thread Terry Hole
Hao,

For spark 1.4.1, you can try this:
val rowrdd = df.rdd.map(r => Row(Row(r(3)), Row(r(0), r(1), r(2
val newDF = sqlContext.createDataFrame(rowrdd, yourNewSchema)

Thanks!

- Terry

On Wed, Sep 16, 2015 at 2:10 AM, Hao Wang  wrote:

> Hi,
>
> I created a dataframe with 4 string columns (city, state, country,
> zipcode).
> I then applied the following nested schema to it by creating a custom
> StructType. When I run df.take(5), it gives the exception below as
> expected.
> The question is how I can convert the Rows in the dataframe to conform to
> this nested schema? Thanks!
>
> root
>  |-- ZipCode: struct (nullable = true)
>  ||-- zip: string (nullable = true)
>  |-- Address: struct (nullable = true)
>  ||-- city: string (nullable = true)
>  ||-- state: string (nullable = true)
>  ||-- country: string (nullable = true)
>
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
> java.lang.String)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
> [info] at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
> [info] at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-09 Thread Terry Hole
Sean,

Thank you!

Finally, I get this to work, although it is a bit ugly: manually to set the
meta data of dataframe.

import org.apache.spark.ml.attribute._
import org.apache.spark.sql.types._
val df = training.toDF()
val schema = df.schema
val rowRDD = df.rdd
def enrich(m : Metadata) : Metadata = {
  val na = NominalAttribute.defaultAttr.withValues("0", "1")
  na.toMetadata(m)
}
val newSchema = StructType(schema.map(f => if (f.name == "label")
f.copy(metadata=enrich(f.metadata)) else f))
val model = pipeline.fit(sqlContext.createDataFrame(rowRDD, newSchema))

Thanks!
- Terry

On Mon, Sep 7, 2015 at 4:24 PM, Sean Owen <so...@cloudera.com> wrote:

> Hm, off the top of my head I don't know. I haven't looked at this
> aspect in a while, strangely. It's an attribute in the metadata of the
> field. I assume there's a method for setting this metadata when you
> construct the input data.
>
> On Sun, Sep 6, 2015 at 10:41 AM, Terry Hole <hujie.ea...@gmail.com> wrote:
> > Sean
> >
> > Do you know how to tell decision tree that the "label" is a binary or set
> > some attributes to dataframe to carry number of classes?
> >
> > Thanks!
> > - Terry
> >
> > On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote:
> >>
> >> (Sean)
> >> The error suggests that the type is not a binary or nominal attribute
> >> though. I think that's the missing step. A double-valued column need
> >> not be one of these attribute types.
> >>
> >> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com>
> wrote:
> >> > Hi, Owen,
> >> >
> >> > The dataframe "training" is from a RDD of case class:
> >> > RDD[LabeledDocument],
> >> > while the case class is defined as this:
> >> > case class LabeledDocument(id: Long, text: String, label: Double)
> >> >
> >> > So there is already has the default "label" column with "double" type.
> >> >
> >> > I already tried to set the label column for decision tree as this:
> >> > val lr = new
> >> >
> >> >
> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
> >> > It raised the same error.
> >> >
> >> > I also tried to change the "label" to "int" type, it also reported
> error
> >> > like following stack, I have no idea how to make this work.
> >> >
> >> > java.lang.IllegalArgumentException: requirement failed: Column label
> >> > must be
> >> > of type DoubleType but was actually IntegerType.
> >> > at scala.Predef$.require(Predef.scala:233)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
> >> > at
> >> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> >> > at
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> >> > at
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
> >> > at
> >> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
> >> > at
> >> > org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
> >> > at
> >> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
> >> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
> >> > at
> >> >
> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
> >> > at
> >> >
> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
> >> > at
> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iw

Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-07 Thread Terry Hole
Xiangrui,

Do you have any idea how to make this work?

Thanks
- Terry

Terry Hole <hujie.ea...@gmail.com>于2015年9月6日星期日 17:41写道:

> Sean
>
> Do you know how to tell decision tree that the "label" is a binary or set
> some attributes to dataframe to carry number of classes?
>
> Thanks!
> - Terry
>
> On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> (Sean)
>> The error suggests that the type is not a binary or nominal attribute
>> though. I think that's the missing step. A double-valued column need
>> not be one of these attribute types.
>>
>> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com>
>> wrote:
>> > Hi, Owen,
>> >
>> > The dataframe "training" is from a RDD of case class:
>> RDD[LabeledDocument],
>> > while the case class is defined as this:
>> > case class LabeledDocument(id: Long, text: String, label: Double)
>> >
>> > So there is already has the default "label" column with "double" type.
>> >
>> > I already tried to set the label column for decision tree as this:
>> > val lr = new
>> >
>> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
>> > It raised the same error.
>> >
>> > I also tried to change the "label" to "int" type, it also reported error
>> > like following stack, I have no idea how to make this work.
>> >
>> > java.lang.IllegalArgumentException: requirement failed: Column label
>> must be
>> > of type DoubleType but was actually IntegerType.
>> > at scala.Predef$.require(Predef.scala:233)
>> > at
>> >
>> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
>> > at
>> >
>> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
>> > at
>> >
>> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
>> > at
>> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
>> > at
>> >
>> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
>> > at
>> >
>> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
>> > at
>> >
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>> > at
>> >
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>> > at
>> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
>> > at
>> org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
>> > at
>> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
>> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
>> > at
>> >
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
>> > at
>> >
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
>> > at
>> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
>> > at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
>> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62)
>> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
>> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66)
>> > at $iwC$$iwC$$iwC$$iwC$$iwC.(:68)
>> > at $iwC$$iwC$$iwC$$iwC.(:70)
>> > at $iwC$$iwC$$iwC.(:72)
>> > at $iwC$$iwC.(:74)
>> > at $iwC.(:76)
>> > at (:78)
>> > at .(:82)
>> > at .()
>> > at .(:7)
>> > at .()
>> > at $print()
>> >
>> > Thanks!
>> > - Terry
>> >
>> > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote:
>> >>
>> >> I think somewhere alone the line you've not specified your label
>> >> column -- it's defaulting to "label" and it does not recognize it, or
>> >> at least not as a binary or nominal attribute.
>> >>
>> >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com>
>> wrote:
>> >> > Hi, Experts,
>> >> >
>> >> > I followed the guide of spark ml pipe to test DecisionTr

Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-06 Thread Terry Hole
Hi, Owen,

The dataframe "training" is from a RDD of case class: RDD[LabeledDocument],
while the case class is defined as this:
case class LabeledDocument(id: Long, text: String, *label: Double*)

So there is already has the default "label" column with "double" type.

I already tried to set the label column for decision tree as this:
val lr = new
DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
It raised the same error.

I also tried to change the "label" to "int" type, it also reported error
like following stack, I have no idea how to make this work.

java.lang.IllegalArgumentException: requirement failed: *Column label must
be of type DoubleType but was actually IntegerType*.
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
at
org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
at
org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
at
org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
at
org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
at
org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at
scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
at
org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:68)
at $iwC$$iwC$$iwC$$iwC.(:70)
at $iwC$$iwC$$iwC.(:72)
at $iwC$$iwC.(:74)
at $iwC.(:76)
at (:78)
at .(:82)
at .()
at .(:7)
at .()
at $print()

Thanks!
- Terry

On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote:

> I think somewhere alone the line you've not specified your label
> column -- it's defaulting to "label" and it does not recognize it, or
> at least not as a binary or nominal attribute.
>
> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com> wrote:
> > Hi, Experts,
> >
> > I followed the guide of spark ml pipe to test DecisionTreeClassifier on
> > spark shell with spark 1.4.1, but always meets error like following, do
> you
> > have any idea how to fix this?
> >
> > The error stack:
> > java.lang.IllegalArgumentException: DecisionTreeClassifier was given
> input
> > with invalid label column label, without the number of classes specified.
> > See StringIndexer.
> > at
> >
> org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:71)
> > at
> >
> org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:41)
> > at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
> > at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
> > at
> > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:133)
> > at
> > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:129)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> >
> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:42)
> > at
> >
> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:43)
> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:129)
> > at
> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:42)
> > at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
> > at $iwC$$iwC$$iwC$$iwC$$iwC.(:55)
> > at $iwC$$iwC$$iwC$$iwC.(:57

Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-06 Thread Terry Hole
Sean

Do you know how to tell decision tree that the "label" is a binary or set
some attributes to dataframe to carry number of classes?

Thanks!
- Terry

On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote:

> (Sean)
> The error suggests that the type is not a binary or nominal attribute
> though. I think that's the missing step. A double-valued column need
> not be one of these attribute types.
>
> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com> wrote:
> > Hi, Owen,
> >
> > The dataframe "training" is from a RDD of case class:
> RDD[LabeledDocument],
> > while the case class is defined as this:
> > case class LabeledDocument(id: Long, text: String, label: Double)
> >
> > So there is already has the default "label" column with "double" type.
> >
> > I already tried to set the label column for decision tree as this:
> > val lr = new
> >
> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
> > It raised the same error.
> >
> > I also tried to change the "label" to "int" type, it also reported error
> > like following stack, I have no idea how to make this work.
> >
> > java.lang.IllegalArgumentException: requirement failed: Column label
> must be
> > of type DoubleType but was actually IntegerType.
> > at scala.Predef$.require(Predef.scala:233)
> > at
> >
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
> > at
> >
> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
> > at
> >
> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
> > at
> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
> > at
> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> > at
> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> > at
> >
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> > at
> >
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
> > at
> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
> > at
> org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
> > at
> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
> > at
> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
> > at
> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
> > at
> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
> > at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66)
> > at $iwC$$iwC$$iwC$$iwC$$iwC.(:68)
> > at $iwC$$iwC$$iwC$$iwC.(:70)
> > at $iwC$$iwC$$iwC.(:72)
> > at $iwC$$iwC.(:74)
> > at $iwC.(:76)
> > at (:78)
> > at .(:82)
> > at .()
> > at .(:7)
> > at .()
> > at $print()
> >
> > Thanks!
> > - Terry
> >
> > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote:
> >>
> >> I think somewhere alone the line you've not specified your label
> >> column -- it's defaulting to "label" and it does not recognize it, or
> >> at least not as a binary or nominal attribute.
> >>
> >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com>
> wrote:
> >> > Hi, Experts,
> >> >
> >> > I followed the guide of spark ml pipe to test DecisionTreeClassifier
> on
> >> > spark shell with spark 1.4.1, but always meets error like following,
> do
> >> > you
> >> > have any idea how to fix this?
> >> >
> >> > The error stack:
> >> > java.lang.IllegalArgumentException: DecisionTreeClassifier was given
> >> > input
> >> > with invalid label column label, without the number of classes
> >> > specified.
> >> > See StringIndexer.
> >> > at
> >> >
> >&g

Re: Job aborted due to stage failure: java.lang.StringIndexOutOfBoundsException: String index out of range: 18

2015-08-28 Thread Terry Hole
Ricky,


You may need to use map instead of flatMap in your case

*val rowRDD=sc.textFile(/user/spark/short_model).map(_.split(\\t)).map(p
= Row(...))*


Thanks!

-Terry


On Fri, Aug 28, 2015 at 5:08 PM, our...@cnsuning.com our...@cnsuning.com
wrote:

 hi all,

 when using  spark sql ,A problem bothering me.

 the codeing as following:

  *val schemaString =
 visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fromcuxiao_pv,app_four_page_num,app_group_gds_addcart,app_group_gds_collect,app_group_fourpage_pv,app_group_fourpage_time,app_visitor_pv,app_search_pv,app_list_pv,app_is_view1,app_is_view,app_view_cycle_days,app_view_days,gds_score_desc,l4_gds_group_rate_n,decision_cycle_days,decision_days,decision_pv,is_order,statis_date*
 //*schemaString.length=72 *

 *import org.apache.spark.sql.Row;*

 *import org.apache.spark.sql.types.{StructType,StructField,StringType};*

 *val schema =StructType( schemaString.split(,).map(fieldName = 
 StructField(fieldName, StringType, true)))*

 *val 
 rowRDD=sc.textFile(/user/spark/short_model).flatMap(_.split(\\t)).map(p 
 = 
 Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71)))*

 *val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)*

 *peopleDataFrame.registerTempTable(alg)*

 *val results = sqlContext.sql(SELECT count(*) FROM alg)*

 *results.collect()*


 the error log as following:

   5/08/28 17:00:54 WARN TaskSetManager: Lost task 56.0 in
 stage 9.0 (TID 71, 10.104.74.8): java.lang.StringIndexOutOfBoundsException:
 String index out of range: 18
 at java.lang.String.charAt(String.java:658)
 at
 scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
 at
 $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:26)
 at
 $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:26)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
 at
 org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.1 in stage 9.0
 (TID 72, 10.104.74.8, NODE_LOCAL, 1415 bytes)
 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.1 in stage 9.0 (TID
 72) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException
 (String index out 

Re: standalone to connect mysql

2015-07-21 Thread Terry Hole
Jack,

You can refer the hive sql syntax if you use HiveContext:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML

Thanks!
-Terry

 That works! Thanks.

Can I ask you one further question?

 How did spark sql support insertion?



 That is say, if I did:

 sqlContext.sql(insert into newStu values (“10”,”a”,1)



 the error is:

 failure: ``table'' expected but identifier newStu found

 insert into newStu values ('10', aa, 1)



 but if I did:

 sqlContext.sql(sinsert into Table newStu select * from otherStu)

 that works.



 Is there any document addressing that?





 Best regards,

 Jack





 *From:* Terry Hole [mailto:hujie.ea...@gmail.com]
 *Sent:* Tuesday, 21 July 2015 4:17 PM
 *To:* Jack Yang; user@spark.apache.org
 *Subject:* Re: standalone to connect mysql



 Maybe you can try: spark-submit --class sparkwithscala.SqlApp
  --jars /home/lib/mysql-connector-java-5.1.34.jar --master 
 spark://hadoop1:7077
 /home/myjar.jar



 Thanks!

 -Terry

  Hi there,



 I would like to use spark to access the data in mysql. So firstly  I tried
 to run the program using:

 spark-submit --class sparkwithscala.SqlApp --driver-class-path
 /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar



 that returns me the correct results. Then I tried the standalone version
 using:

 spark-submit --class sparkwithscala.SqlApp --driver-class-path
 /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077
 /home/myjar.jar

 (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.)

 and the error is:



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent
 failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129):
 java.sql.SQLException: No suitable driver found for
 jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root



 I also found the similar problem before in
 https://jira.talendforge.org/browse/TBD-2244.



 Is this a bug to be fixed later? Or do I miss anything?







 Best regards,

 Jack






Re: standalone to connect mysql

2015-07-21 Thread Terry Hole
Maybe you can try: spark-submit --class sparkwithscala.SqlApp  --jars
/home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077
/home/myjar.jar

Thanks!
-Terry

  Hi there,



 I would like to use spark to access the data in mysql. So firstly  I tried
 to run the program using:

 spark-submit --class sparkwithscala.SqlApp --driver-class-path
 /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar



 that returns me the correct results. Then I tried the standalone version
 using:

 spark-submit --class sparkwithscala.SqlApp --driver-class-path
 /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077
 /home/myjar.jar

 (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.)

 and the error is:



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent
 failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129):
 java.sql.SQLException: No suitable driver found for
 jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root



 I also found the similar problem before in
 https://jira.talendforge.org/browse/TBD-2244.



 Is this a bug to be fixed later? Or do I miss anything?







 Best regards,

 Jack





Re: [Spark Shell] Could the spark shell be reset to the original status?

2015-07-16 Thread Terry Hole
Hi Ted,

Thanks for the information. The post seems little different with my
requirement: suppose we defined different functions to do different
streaming work (e.g. 50 functions), i want to test these 50 functions in
the spark shell, and the shell will always throw OOM at the middle of test
(yes, it could be solved by increasing the jvm memory size, but if we have
more functions, the issue still will happen). The main issue is that the
shell keeps track all the information (class, objects...) from started, so
the java memory will increase time to time when define/invoke the
functions.

Thanks!
- Terry

Ted Yu yuzhih...@gmail.com于2015年7月17日周五 下午12:02写道:

 See this recent thread:


 http://search-hadoop.com/m/q3RTtFW7iMDkrj61/Spark+shell+oom+subj=java+lang+OutOfMemoryError+PermGen+space



 On Jul 16, 2015, at 8:51 PM, Terry Hole hujie.ea...@gmail.com wrote:

 Hi,

 Background: The spark shell will get out of memory error after dealing
 lots of spark work.

 Is there any method which can reset the spark shell to the startup status?
 I tried *:reset*, but it seems not working: i can not create spark
 context anymore (some compile error as below) after the *:reset*. (I
 have to restart the shell after OOM to workaround)

 == Expanded type of tree ==
 TypeRef(TypeSymbol(class $read extends Serializable))
 uncaught exception during compilation: java.lang.AssertionError
 java.lang.AssertionError: assertion failed: Tried to find '$line16' in
 'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63'
 but it is not a directory
 That entry seems to have slain the compiler.  Shall I replayyour session?
 I can re-run each line except the last one.[y/n]
 Abandoning crashed session.

 Thanks!
 -Terry




[Spark Shell] Could the spark shell be reset to the original status?

2015-07-16 Thread Terry Hole
Hi,

Background: The spark shell will get out of memory error after dealing lots
of spark work.

Is there any method which can reset the spark shell to the startup status?
I tried *:reset*, but it seems not working: i can not create spark
context anymore (some compile error as below) after the *:reset*. (I have
to restart the shell after OOM to workaround)

== Expanded type of tree ==
TypeRef(TypeSymbol(class $read extends Serializable))
uncaught exception during compilation: java.lang.AssertionError
java.lang.AssertionError: assertion failed: Tried to find '$line16' in
'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63'
but it is not a directory
That entry seems to have slain the compiler.  Shall I replayyour session? I
can re-run each line except the last one.[y/n]
Abandoning crashed session.

Thanks!
-Terry


Re: fileStream with old files

2015-07-15 Thread Terry Hole
Hi, Hunter,

*What **behavior do you see with the HDFS? The local file system and HDFS
should have the same ** behavior.*

*Thanks!*
*- Terry*

Hunter Morgan hunter.mor...@rackspace.com于2015年7月16日周四 上午2:04写道:

  After moving the setting of the parameter to SparkConf initialization
 instead of after the context is already initialized, I have it operating
 reliably on local filesystem, but not on hdfs. Are there any differences in
 behavior between these two cases I should be aware of?



 I don’t usually mailinglist or exchange, so forgive me for my ignorance of
 whether this message will go horribly wrong due to formatting.



 I plan to port the following code to Hadoop FS API to generalize testing
 to understand actual behavior and ensure desired behavior.

 public static JavaDStreamString 
 textFileStreamIncludingExisting(JavaStreamingContext context, String path)
 {
 return context.fileStream(path, LongWritable
 .class, Text.class, TextInputFormat.class, v1 - true, 
 false).map(v1 - v1._2.toString());
 }



 @Test
 public void testTextFileStreamIncludingExistingReadsOldFiles() throws
 Exception
 {
 final Path testDir = Files.createTempDirectory(sparkTest);
 final ArrayListPath tempFiles = new ArrayList();

 // create 20 old files
 final int testFileNumberLimit = 20;
 for (int testFileNumber = 0; testFileNumber  testFileNumberLimit;
 testFileNumber++)
 {
 final Path testFile = Files.createTempFile(testDir, testFile, 
 );
 tempFiles.add(testFile);
 final FileWriter fileWriter = new FileWriter(testFile.toFile());
 fileWriter.write(asdf);
 fileWriter.flush();
 fileWriter.close();
 for (String eachAttribute : new String[]{basic:lastAccessTime,
 basic:lastModifiedTime,
 basic:creationTime})
 { // set file dates 0 to 20 days ago
 Files.setAttribute(testFile, eachAttribute, FileTime.from(
 Instant.now().minus(Duration.ofDays
 (testFileNumber;
 }
 }

 final SparkConf sparkConf = new SparkConf().setMaster(local[1]).
 setAppName(test);
 sparkConf.set(spark.streaming.minRememberDuration, String.valueOf(
 Integer.MAX_VALUE));
 final JavaStreamingContext context = new JavaStreamingContext(
 sparkConf, Durations.seconds(1));
 final JavaDStreamString input = SparkUtil.
 textFileStreamIncludingExisting(context, String.valueOf(testDir
 .toUri()));
 // count files read
 final AccumulatorInteger accumulator = context.sparkContext().
 accumulator(0);

 // setup async wait
 Semaphore done = new Semaphore(1);
 done.acquire();
 input.foreachRDD(new FunctionJavaRDDString, Void()
 {
 @Override
 public Void call(JavaRDDString v1) throws Exception
 {
 if (v1.count() == 0)
 {
 done.release();
 }
 accumulator.add((int) v1.count());
 return null;
 }
 });
 context.start();
 // wait for completion or 20 sec
 done.tryAcquire(20, TimeUnit.SECONDS);
 context.stop();

 assertThat(accumulator.value(), is(testFileNumberLimit));

 for (Path eachTempFile : tempFiles)
 {
 Files.deleteIfExists(eachTempFile);
 }
 Files.deleteIfExists(testDir);
 }





 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Wednesday, July 15, 2015 00:01
 *To:* Terry Hole
 *Cc:* Hunter Morgan; user@spark.apache.org


 *Subject:* Re: fileStream with old files



 It was added, but its not documented publicly. I am planning to change the
 name of the conf to spark.streaming.fileStream.minRememberDuration to make
 it easier to understand



 On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole hujie.ea...@gmail.com wrote:

  A new configuration named *spark.streaming.minRememberDuration* was
 added since 1.2.1 to control the file stream input, the default value is *60
 seconds*, you can change this value to a large value to include older
 files (older than 1 minute)



 You can get the detail from this jira:
 https://issues.apache.org/jira/browse/SPARK-3276



 -Terry



 On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant 
 hunter.mor...@rackspace.com wrote:

 It's not as odd as it sounds. I want to ensure that long streaming job
 outages can recover all the files that went into a directory while the job
 was down.
 I've looked at

 http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
 and

 https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
 , but all seem unhelpful.
 I've tested combinations of the following:
  * fileStreams created with dumb accept-all filters
  * newFilesOnly true and false,
  * tweaking minRememberDuration to high and low values,
  * on hdfs or local

Re: fileStream with old files

2015-07-13 Thread Terry Hole
A new configuration named *spark.streaming.minRememberDuration* was added
since 1.2.1 to control the file stream input, the default value is *60
seconds*, you can change this value to a large value to include older files
(older than 1 minute)

You can get the detail from this jira:
https://issues.apache.org/jira/browse/SPARK-3276

-Terry

On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant hunter.mor...@rackspace.com
 wrote:

 It's not as odd as it sounds. I want to ensure that long streaming job
 outages can recover all the files that went into a directory while the job
 was down.
 I've looked at

 http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
 and

 https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
 , but all seem unhelpful.
 I've tested combinations of the following:
  * fileStreams created with dumb accept-all filters
  * newFilesOnly true and false,
  * tweaking minRememberDuration to high and low values,
  * on hdfs or local directory.
 The problem is that it will not read files in the directory from more than
 a
 minute ago.
 JavaPairInputDStreamLongWritable, Text input = context.fileStream(indir,
 LongWritable.class, Text.class, TextInputFormat.class, v - true, false);
 Also tried with having set:
 context.sparkContext().getConf().set(spark.streaming.minRememberDuration,
 1654564); to big/small.

 Are there known limitations of the onlyNewFiles=false? Am I doing something
 wrong?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: [Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?

2015-07-10 Thread Terry Hole
Michael,

Thanks

- Terry

Michael Armbrust mich...@databricks.com于2015年7月11日星期六 04:02写道:

 Metastore configuration should be set in hive-site.xml.

 On Thu, Jul 9, 2015 at 8:59 PM, Terry Hole hujie.ea...@gmail.com wrote:

 Hi,

 I am trying to set the hive metadata destination to a mysql database in
 hive context, it works fine in spark 1.3.1, but it seems broken in spark
 1.4.1-rc1, where it always connect to the default metadata: local), is this
 a regression or we must set the connection in hive-site.xml?

 The code is very simple in spark shell:
* import org.apache.spark.sql.hive._*
 *val hiveContext = new HiveContext(sc)*
 *hiveContext.setConf(javax.jdo.option.ConnectionDriveName,
 com.mysql.jdbc.Driver)*
 *hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)*
 *hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)*
 *hiveContext.setConf(javax.jdo.option.ConnectionURL,
 jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)*
 *hiveContext.setConf(hive.metastore.warehouse.dir,
 /user/hive/warehouse)*
 *hiveContext.sql(select * from mysqltable).show()*

 *Thanks!*
 *-Terry*





[Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?

2015-07-09 Thread Terry Hole
Hi,

I am trying to set the hive metadata destination to a mysql database in
hive context, it works fine in spark 1.3.1, but it seems broken in spark
1.4.1-rc1, where it always connect to the default metadata: local), is this
a regression or we must set the connection in hive-site.xml?

The code is very simple in spark shell:
   * import org.apache.spark.sql.hive._*
*val hiveContext = new HiveContext(sc)*
*hiveContext.setConf(javax.jdo.option.ConnectionDriveName,
com.mysql.jdbc.Driver)*
*hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)*
*hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)*
*hiveContext.setConf(javax.jdo.option.ConnectionURL,
jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)*
*hiveContext.setConf(hive.metastore.warehouse.dir,
/user/hive/warehouse)*
*hiveContext.sql(select * from mysqltable).show()*

*Thanks!*
*-Terry*


Re: Is there a way to shutdown the derby in hive context in spark shell?

2015-07-09 Thread Terry Hole
)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
at
org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:116)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:172)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:168)
at
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:213)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:176)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:359)

On Thu, Jul 9, 2015 at 2:32 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try sc.shutdown and creating a new one?

 Thanks
 Best Regards

 On Wed, Jul 8, 2015 at 8:12 PM, Terry Hole hujie.ea...@gmail.com wrote:

 I am using spark 1.4.1rc1 with default hive settings

 Thanks
 - Terry

 Hi All,

 I'd like to use the hive context in spark shell, i need to recreate the
 hive meta database in the same location, so i want to close the derby
 connection previous created in the spark shell, is there any way to do this?

 I try this, but it does not work:

 DriverManager.getConnection(jdbc:derby:;shutdown=true);

 Thanks!

 - Terry






Is there a way to shutdown the derby in hive context in spark shell?

2015-07-08 Thread Terry Hole
Hi All,

I'd like to use the hive context in spark shell, i need to recreate the
hive meta database in the same location, so i want to close the derby
connection previous created in the spark shell, is there any way to do this?

I try this, but it does not work:
DriverManager.getConnection(jdbc:derby:;shutdown=true);

Thanks!
- Terry


Re: Is there a way to shutdown the derby in hive context in spark shell?

2015-07-08 Thread Terry Hole
I am using spark 1.4.1rc1 with default hive settings

Thanks
- Terry

Hi All,

I'd like to use the hive context in spark shell, i need to recreate the
hive meta database in the same location, so i want to close the derby
connection previous created in the spark shell, is there any way to do this?

I try this, but it does not work:

DriverManager.getConnection(jdbc:derby:;shutdown=true);

Thanks!

- Terry


Re: Meets class not found error in spark console with newly hive context

2015-07-02 Thread Terry Hole
Found this a bug in spark 1.4.0: SPARK-8368
https://issues.apache.org/jira/browse/SPARK-8368

Thanks!
Terry

On Thu, Jul 2, 2015 at 1:20 PM, Terry Hole hujie.ea...@gmail.com wrote:

 All,

 I am using spark console 1.4.0 to do some tests, when a create a newly
 HiveContext (Line 18 in the code) in my test function, it always throw
 exception like below (It works in spark console 1.3.0), but if i removed
 the HiveContext (The line 18 in the code) in my function, it works fine.
 Any idea what's wrong with this?

 java.lang.ClassNotFoundException:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
 iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at
 java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at
 org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos
 ureCleaner.scala:455)
 at
 com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
 Source)
 at
 com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
 Source)
 at
 org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101)
 at
 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
 at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
 at
 org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
 at
 org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)
 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
 $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(console:98)
 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
 $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:93)
 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
 $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:98)



  1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._ 3 
 import org.apache.spark.streaming.{ StreamingContext, Seconds, Minutes, Time 
 } 4 import org.apache.spark.streaming.StreamingContext._ 5 import 
 org.apache.spark.rdd.RDD 6 import org.apache.spark.streaming.dstream.DStream 
 7 import org.apache.spark.HashPartitioner 8 import 
 org.apache.spark.storage.StorageLevel 9 import org.apache.spark.sql._10 
 import org.apache.spark.sql.hive._11 import 
 scala.collection.mutable.{Queue}12 import scala.concurrent.Future13 import 
 scala.concurrent.ExecutionContext.Implicits.global14 15 def 
 streamingTest(args: Array[String]) {16 println( create 
 streamingContext.)17 val ssc = new StreamingContext(sc, Seconds(1))18
  *val sqlContext2 = new HiveContext(sc)*19 20 val accum = 
 sc.accumulator(0, End Accumulator)21 val queue = 
 scala.collection.mutable.Queue(sc.textFile(G:/pipe/source))22 val 
 textSource = ssc.queueStream(queue, true)23 textSource.foreachRDD(rdd = 
 { rdd.foreach( item = {accum += 1} ) })24 textSource.foreachRDD(rdd = 
 {25 var sample = rdd.take(10)26 if 
 (sample.length  0) {27 sample.foreach(item = 
 println(#=  + item))28 }29 })30 
 println( Start streaming context.)31 ssc.start()32 val stopFunc 
 = Future {var isRun = true; var duration = 0; while (isRun) { 
 Thread.sleep(1000); duration += 1; if ( accum.value  0 || duration = 120) 
 {println(### STOP SSC ###);ssc.stop(false, true); duration = 0; isRun = 
 false} }}33 ssc.awaitTermination()34 println( Streaming context 
 terminated.)35 }36 37 streamingTest(null)38

 Thanks
 Terry



Meets class not found error in spark console with newly hive context

2015-07-01 Thread Terry Hole
All,

I am using spark console 1.4.0 to do some tests, when a create a newly
HiveContext (Line 18 in the code) in my test function, it always throw
exception like below (It works in spark console 1.3.0), but if i removed
the HiveContext (The line 18 in the code) in my function, it works fine.
Any idea what's wrong with this?

java.lang.ClassNotFoundException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at
java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos
ureCleaner.scala:455)
at
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source)
at
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source)
at
org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
at
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
at
org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(console:98)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:93)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:98)



 1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._
3 import org.apache.spark.streaming.{ StreamingContext, Seconds,
Minutes, Time } 4 import org.apache.spark.streaming.StreamingContext._
5 import org.apache.spark.rdd.RDD 6 import
org.apache.spark.streaming.dstream.DStream 7 import
org.apache.spark.HashPartitioner 8 import
org.apache.spark.storage.StorageLevel 9 import
org.apache.spark.sql._10 import org.apache.spark.sql.hive._11 import
scala.collection.mutable.{Queue}12 import scala.concurrent.Future13
import scala.concurrent.ExecutionContext.Implicits.global14 15 def
streamingTest(args: Array[String]) {16 println( create
streamingContext.)17 val ssc = new StreamingContext(sc,
Seconds(1))18 *val sqlContext2 = new HiveContext(sc)*19 20 val
accum = sc.accumulator(0, End Accumulator)21 val queue =
scala.collection.mutable.Queue(sc.textFile(G:/pipe/source))22
val textSource = ssc.queueStream(queue, true)23
textSource.foreachRDD(rdd = { rdd.foreach( item = {accum += 1} )
})24 textSource.foreachRDD(rdd = {25 var sample =
rdd.take(10)26 if (sample.length  0) {27
   sample.foreach(item = println(#=  + item))28
}29 })30 println( Start streaming context.)31
ssc.start()32 val stopFunc = Future {var isRun = true; var
duration = 0; while (isRun) { Thread.sleep(1000); duration += 1; if (
accum.value  0 || duration = 120) {println(### STOP SSC
###);ssc.stop(false, true); duration = 0; isRun = false} }}33
ssc.awaitTermination()34 println( Streaming context
terminated.)35 }36 37 streamingTest(null)38

Thanks
Terry


Re: Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-11 Thread Terry Hole
Hi, Akhil,

I tried this. It did not work. I also tried SparkConf.set(akka.
extensions,[\kamon.system.SystemMetrics\, \kamon.statsd.StatsD\]), it
also did not work.

Thanks

On Mon, May 11, 2015 at 2:56 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Try SparkConf.set(spark.akka.extensions,Whatever), underneath i think
 spark won't ship properties which don't start with spark.* to the executors.

 Thanks
 Best Regards

 On Mon, May 11, 2015 at 8:33 AM, Terry Hole hujie.ea...@gmail.com wrote:

 Hi all,

 I'd like to monitor the akka using kamon, which need to set the
 akka.extension to a list like this in typesafe config format:
   akka {
 extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD]
   }

 But i can not find a way to do this, i have tried these:
 1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics,
 kamon.statsd.StatsD])
 2. use application.conf and set it use java option
 -Dconfig.resource=/path/to/conf
 3. Set akka.extensions [kamon.system.SystemMetrics,
 kamon.statsd.StatsD] in spark conf file

 None of these work.

 Do we have others ways to set this?

 Thanks!





Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-10 Thread Terry Hole
Hi all,

I'd like to monitor the akka using kamon, which need to set the
akka.extension to a list like this in typesafe config format:
  akka {
extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD]
  }

But i can not find a way to do this, i have tried these:
1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics,
kamon.statsd.StatsD])
2. use application.conf and set it use java option
-Dconfig.resource=/path/to/conf
3. Set akka.extensions [kamon.system.SystemMetrics,
kamon.statsd.StatsD] in spark conf file

None of these work.

Do we have others ways to set this?

Thanks!


Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-10 Thread Terry Hole
Hi all,

I'd like to monitor the akka using kamon, which need to set the
akka.extension to a list like this in typesafe config format:

  akka {

extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD]

  }

But i can not find a way to do this, i have tried these:

1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics,
kamon.statsd.StatsD])

2. use application.conf and set it use java option
-Dconfig.resource=/path/to/conf

3. Set akka.extensions [kamon.system.SystemMetrics,
kamon.statsd.StatsD] in spark conf file

None of these two works.

Do we have others ways to set this?

Thanks!


Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-07 Thread Terry Hole
Hi all,

I'd like to monitor the akka using kamon, which need to set the
akka.extension to a list like this in typesafe config format:
  akka {
extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD]
  }

But i can not find a way to do this, i have tried these:
1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics,
kamon.statsd.StatsD])
2. use application.conf and set it use java option
-Dconfig.resource=/path/to/conf
3. Set akka.extensions [kamon.system.SystemMetrics,
kamon.statsd.StatsD] in spark conf file

None of these two works.

Do we have others ways to set this?

Thanks!


Re: spark 1.3.0 strange log message

2015-04-23 Thread Terry Hole
Use this in spark conf: spark.ui.showConsoleProgress=false

Best Regards,

On Fri, Apr 24, 2015 at 11:23 AM, Henry Hung ythu...@winbond.com wrote:

  Dear All,



 When using spark 1.3.0 spark-submit with directing out and err to a log
 file, I saw some strange lines inside that looks like this:

 [Stage 0:(0 + 2)
 / 120]

 [Stage 0:(2 + 2)
 / 120]

 [Stage 0:==  (6 + 2)
 / 120]

 [Stage 0:=  (12 + 2)
 / 120]

 [Stage 0:=  (20 + 2)
 / 120]

 [Stage 0:===(24 + 2)
 / 120]

 [Stage 0:== (32 + 2)
 / 120]

 [Stage 0:===(42 + 2)
 / 120]

 [Stage 0:   (52 + 2)
 / 120]

 [Stage 0:===(59 + 2)
 / 120]

 [Stage 0:===(68 + 2)
 / 120]

 [Stage 0:   (78 + 3)
 / 120]

 [Stage 0:=  (88 + 4)
 / 120]

 [Stage 0:= (100 + 2)
 / 120]

 [Stage 0:==(110 + 2)
 / 120]





 Here is my log4j property:



 # Set everything to be logged to the console

 log4j.rootCategory=WARN, console

 log4j.appender.console=org.apache.log4j.ConsoleAppender

 log4j.appender.console.target=System.err

 log4j.appender.console.layout=org.apache.log4j.PatternLayout

 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
 %c{1}: %m%n



 # Settings to quiet third party logs that are too verbose

 log4j.logger.org.eclipse.jetty=WARN

 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR

 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO

 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO





 I want to know how to disable this kind of stage progress message?



 Best regards,

 Henry

 --
 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original sender
 of this email. If you are not the addressee indicated in this email or are
 not responsible for delivery of the email to such a person, please kindly
 reply to the sender indicating this fact and delete all copies of it from
 your computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.



Fwd: [Spark Streaming] The FileInputDStream newFilesOnly=false does not work in 1.2 since

2015-01-20 Thread Terry Hole
Hi,

I am trying to move from 1.1 to 1.2 and found that the newFilesOnly=false
(Intend to include old files) does not work anymore. It works great in 1.1,
this should be introduced by the last change of this class.



Does this flag behavior change or is it a regression?

Issue should be caused by this code:
From line 157 in FileInputDStream.scala
val modTimeIgnoreThreshold = math.max(
initialModTimeIgnoreThreshold,   // initial threshold based on
newFilesOnly setting
currentTime - durationToRemember.milliseconds  // trailing end of
the remember window
  )


Regards

- Terry