Sorry Michael, I ended up using kafka and missed noticing your message.
Yes, I did specify the schema with read.schema and thats when I got:

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)

regards

Sunita


On Mon, Sep 18, 2017 at 10:15 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> You specify the schema when loading a dataframe by calling
> spark.read.schema(...)...
>
> On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Hi Michael,
>>
>> I am wondering what I am doing wrong. I get error like:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Schema
>> must be specified when creating a streaming source DataFrame. If some files
>> already exist in the directory, then depending on the file format you may
>> be able to create a static DataFrame on that directory with
>> 'spark.read.load(directory)' and infer schema from it.
>>     at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceSchema(DataSource.scala:223)
>>     at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceInfo$lzycompute(DataSource.scala:87)
>>     at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceInfo(DataSource.scala:87)
>>     at org.apache.spark.sql.execution.streaming.StreamingRelation$.
>> apply(StreamingRelation.scala:30)
>>     at org.apache.spark.sql.streaming.DataStreamReader.load(
>> DataStreamReader.scala:125)
>>     at org.apache.spark.sql.streaming.DataStreamReader.load(
>> DataStreamReader.scala:134)
>>     at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate
>> s.scala:23)
>>     at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
>>     at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at com.intellij.rt.execution.application.AppMain.main(AppMain.
>> java:144)
>> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>>
>>
>> I tried specifying the schema as well.
>> Here is my code:
>>
>> object Aggregates {
>>
>>   val aggregation=
>>     """select sum(col1), sum(col2), id, first(name)
>>       from enrichedtb
>>       group by id
>>     """.stripMargin
>>
>>   def aggregator(conf:Config)={
>>     implicit val spark = 
>> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
>>     implicit val sqlctx = spark.sqlContext
>>     printf("Source path is" + conf.getString("source.path"))
>>     val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
>> Added this as it was complaining about schema.
>>     val df=spark.readStream.format("parquet").option("inferSchema", 
>> true).schema(schemadf.schema).load(conf.getString("source.path"))
>>     df.createOrReplaceTempView("enrichedtb")
>>     val res = spark.sql(aggregation)
>>     
>> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>>   }
>>
>>   def main(args: Array[String]): Unit = {
>>     val mainconf = ConfigFactory.load()
>>     val conf = mainconf.getConfig(mainconf.getString("pipeline"))
>>     print(conf.toString)
>>     aggregator(conf)
>>   }
>>
>> }
>>
>>
>> I tried to extract schema from static read of the input path and provided it 
>> to the readStream API. With that, I get this error:
>>
>> at 
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>>      at 
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>>      at 
>> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>>      at 
>> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>>      at 
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
>>      at 
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>>
>> While running on the EMR cluster all paths point to S3. In my laptop, they 
>> all point to local filesystem.
>>
>> I am using Spark2.2.0
>>
>> Appreciate your help.
>>
>> regards
>>
>> Sunita
>>
>>
>> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> If you use structured streaming and the file sink, you can have a
>>> subsequent stream read using the file source.  This will maintain exactly
>>> once processing even if there are hiccups or failures.
>>>
>>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <sunitarv...@gmail.com>
>>> wrote:
>>>
>>>> Hello Spark Experts,
>>>>
>>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>>> on premise. My spark application runs on EMR (aws) and persists data onto
>>>> s3. Before I persist, I need to strip header and convert protobuffer to
>>>> parquet (I use sparksql-scalapb to convert from Protobuff to
>>>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>>>> enrichment on the same dataframe after persisting the raw data, however, in
>>>> order to modularize I am planning to have a separate job which picks up the
>>>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>>>> 1 job as the enrichments could get project specific while raw data
>>>> persistence stays customer/project agnostic.The enriched data is allowed to
>>>> have some latency (few minutes)
>>>>
>>>> My challenge is, after persisting the raw data, how do I chain the next
>>>> streaming job. The only way I can think of is -  job 1 (raw data)
>>>> partitions on current date (YYYYMMDD) and within current date, the job 2
>>>> (enrichment job) filters for records within 60s of current time and
>>>> performs enrichment on it in 60s batches.
>>>> Is this a good option? It seems to be error prone. When either of the
>>>> jobs get delayed due to bursts or any error/exception this could lead to
>>>> huge data losses and non-deterministic behavior . What are other
>>>> alternatives to this?
>>>>
>>>> Appreciate any guidance in this regard.
>>>>
>>>> regards
>>>> Sunita Koppar
>>>>
>>>
>>>
>>
>

Reply via email to