Spark 2.1.2 Spark Streaming checkpoint interval not respected

2017-11-18 Thread Shing Hing Man
Hi, 
In the following example using mapWithState, I set checkpoint interval to 1 
minute. From the log, Spark stills write to the checkpoint directory every 
second. Would be appreciated if someone can point out what I have done wrong. 
object MapWithStateDemo {
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println("Usage: MapWithStateDemo  ")
  System.exit(1)
}


val sparkConf = new SparkConf().setAppName("MapWithStateDemo")
  .setIfMissing("spark.master","local[*]")

// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))


// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 
1)))

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of 
the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  val output = (word, sum)
  state.update(sum)
  output
}

val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] =
  
wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD))


stateDstream.checkpoint(Minutes(1L))
stateDstream.print()



val targetDir = new 
File(getClass.getResource("/").toURI).getParentFile.getParentFile
val checkpointDir = targetDir + "/checkpoint"
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
  }
}
Thanks in advance for any assistance !
Shing


Weight column values not used in Binary Logistic Regression Summary

2017-11-18 Thread Stephen Boesch
In BinaryLogisticRegressionSummary there are @Since("1.5.0") tags on a
number of comments identical to the following:

* @note This ignores instance weights (setting all to 1.0) from
`LogisticRegression.weightCol`.
* This will change in later Spark versions.


Are there any plans to address this? Our team is using instance weights
with sklearn LogisticRegression - and this limitation will complicate a
potential migration.


https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala#L1543