Would appreciate help on: 1. How to convert streaming RDD into JavaSchemaRDD 2. How to structure the driver program to do interactive SparkSQL
Using Spark 1.0 with Java. I have steaming code that does upateStateByKey resulting in JavaPairDStream. I am using JavaDStream::compute(time) to get JavaRDD. However I am getting the runtime expection: ERROR at runtime: org.apache.spark.streaming.dstream.StateDStream@18dc1b2 has not been initialized I know the code is executed before the stream is initialized. Does anyone have suggestions on how the design the code so accommodate async processing? Code Fragment: //Spark SQL for the N seconds interval SparkConf sparkConf = new SparkConf().setMaster(SPARK_MASTER).setAppName("SQLStream"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); final JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx); //convert JavaPairDStream to JavaDStream JavaDStream<Tuple2<String,TestConnection.DiscoveryRecord>> javaDStream = statefullStream.toJavaDStream(); //Convert to Tuple<K,U> to U JavaDStream<DiscoveryRecord> javaRDD = javaDStream.map( new Function<Tuple2<String,TestConnection.DiscoveryRecord>, DiscoveryRecord>(){ public DiscoveryRecord call(Tuple2<String,TestConnection.DiscoveryRecord> eachT) { return eachT._2; } } ); //Convert JavaDStream to JavaRDD //ERROR next line at runtime: org.apache.spark.streaming.dstream.StateDStream@18dc1b2 has not been initialized JavaRDD<DiscoveryRecord> computedJavaRDD = javaRDD.compute(new Time(100000)); JavaSchemaRDD schemaStatefull = sqlCtx.applySchema( computedJavaRDD , DiscoveryRecord.class); schemaStatefull.registerAsTable("statefull"); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-with-Streaming-RDD-tp8774.html Sent from the Apache Spark User List mailing list archive at Nabble.com.