Rockie Yang created ZEPPELIN-1568:
-------------------------------------
Summary: print %angular does not work in asynchronous
Key: ZEPPELIN-1568
URL: https://issues.apache.org/jira/browse/ZEPPELIN-1568
Project: Zeppelin
Issue Type: Bug
Components: zeppelin-interpreter
Affects Versions: 0.6.2, 0.6.1
Environment: Docker image based on openjdk:8
Reporter: Rockie Yang
I am trying to add Spark Structure Streaming support to Zeppelin with
Highcharts. https://github.com/knockdata/spark-highcharts/issues/11
It does not render properly with %angular.
case class NuclearStockpile(country: String, stockpile: Int, year: Int)
class CustomSinkProvider extends StreamSinkProvider {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
println("%angular <h3>angular asynchronous</h3>")
}
}
}
}
implicit val sqlCtx = sqlContext
val input = MemoryStream[NuclearStockpile]
val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))
val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
21000, 20000, 19000, 18000, 18000, 17000, 16000).
zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))
input.addData(USA.take(30))
input.addData(USSR.take(30))
val query = input.toDF.writeStream
.format(classOf[CustomSinkProvider].getCanonicalName)
.start()
query.processAllAvailable()
input.addData(USA.drop(30))
input.addData(USSR.drop(30))
query.processAllAvailable()
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)