Sorry Michael, I ended up using kafka and missed noticing your message. Yes, I did specify the schema with read.schema and thats when I got:
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) regards Sunita On Mon, Sep 18, 2017 at 10:15 AM, Michael Armbrust <mich...@databricks.com> wrote: > You specify the schema when loading a dataframe by calling > spark.read.schema(...)... > > On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind <sunitarv...@gmail.com> > wrote: > >> Hi Michael, >> >> I am wondering what I am doing wrong. I get error like: >> >> Exception in thread "main" java.lang.IllegalArgumentException: Schema >> must be specified when creating a streaming source DataFrame. If some files >> already exist in the directory, then depending on the file format you may >> be able to create a static DataFrame on that directory with >> 'spark.read.load(directory)' and infer schema from it. >> at org.apache.spark.sql.execution.datasources.DataSource. >> sourceSchema(DataSource.scala:223) >> at org.apache.spark.sql.execution.datasources.DataSource. >> sourceInfo$lzycompute(DataSource.scala:87) >> at org.apache.spark.sql.execution.datasources.DataSource. >> sourceInfo(DataSource.scala:87) >> at org.apache.spark.sql.execution.streaming.StreamingRelation$. >> apply(StreamingRelation.scala:30) >> at org.apache.spark.sql.streaming.DataStreamReader.load( >> DataStreamReader.scala:125) >> at org.apache.spark.sql.streaming.DataStreamReader.load( >> DataStreamReader.scala:134) >> at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate >> s.scala:23) >> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41) >> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at com.intellij.rt.execution.application.AppMain.main(AppMain. >> java:144) >> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook >> >> >> I tried specifying the schema as well. >> Here is my code: >> >> object Aggregates { >> >> val aggregation= >> """select sum(col1), sum(col2), id, first(name) >> from enrichedtb >> group by id >> """.stripMargin >> >> def aggregator(conf:Config)={ >> implicit val spark = >> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate() >> implicit val sqlctx = spark.sqlContext >> printf("Source path is" + conf.getString("source.path")) >> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // >> Added this as it was complaining about schema. >> val df=spark.readStream.format("parquet").option("inferSchema", >> true).schema(schemadf.schema).load(conf.getString("source.path")) >> df.createOrReplaceTempView("enrichedtb") >> val res = spark.sql(aggregation) >> >> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath")) >> } >> >> def main(args: Array[String]): Unit = { >> val mainconf = ConfigFactory.load() >> val conf = mainconf.getConfig(mainconf.getString("pipeline")) >> print(conf.toString) >> aggregator(conf) >> } >> >> } >> >> >> I tried to extract schema from static read of the input path and provided it >> to the readStream API. With that, I get this error: >> >> at >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) >> at >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109) >> at >> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) >> at >> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) >> at >> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) >> at >> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222) >> >> While running on the EMR cluster all paths point to S3. In my laptop, they >> all point to local filesystem. >> >> I am using Spark2.2.0 >> >> Appreciate your help. >> >> regards >> >> Sunita >> >> >> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <mich...@databricks.com >> > wrote: >> >>> If you use structured streaming and the file sink, you can have a >>> subsequent stream read using the file source. This will maintain exactly >>> once processing even if there are hiccups or failures. >>> >>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <sunitarv...@gmail.com> >>> wrote: >>> >>>> Hello Spark Experts, >>>> >>>> I have a design question w.r.t Spark Streaming. I have a streaming job >>>> that consumes protocol buffer encoded real time logs from a Kafka cluster >>>> on premise. My spark application runs on EMR (aws) and persists data onto >>>> s3. Before I persist, I need to strip header and convert protobuffer to >>>> parquet (I use sparksql-scalapb to convert from Protobuff to >>>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the >>>> enrichment on the same dataframe after persisting the raw data, however, in >>>> order to modularize I am planning to have a separate job which picks up the >>>> raw data and performs enrichment on it. Also, I am trying to avoid all in >>>> 1 job as the enrichments could get project specific while raw data >>>> persistence stays customer/project agnostic.The enriched data is allowed to >>>> have some latency (few minutes) >>>> >>>> My challenge is, after persisting the raw data, how do I chain the next >>>> streaming job. The only way I can think of is - job 1 (raw data) >>>> partitions on current date (YYYYMMDD) and within current date, the job 2 >>>> (enrichment job) filters for records within 60s of current time and >>>> performs enrichment on it in 60s batches. >>>> Is this a good option? It seems to be error prone. When either of the >>>> jobs get delayed due to bursts or any error/exception this could lead to >>>> huge data losses and non-deterministic behavior . What are other >>>> alternatives to this? >>>> >>>> Appreciate any guidance in this regard. >>>> >>>> regards >>>> Sunita Koppar >>>> >>> >>> >> >