Ignore me, a bit more digging and I was able to find the filesink source <https://github.com/apache/spark/blob/1ae4652b7e1f77a984b8459c778cb06c814192c5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala>
Following that pattern worked a treat! Thanks again Micheal :) On Tue, Feb 7, 2017 at 8:44 PM, Sam Elamin <hussam.ela...@gmail.com> wrote: > Sorry those are methods I wrote so you can ignore them :) > > so just adding a path parameter tells spark thats where the update log is? > > Do I check for the unique id there and identify which batch was written > and which weren't > > Are there any examples of this out there? there aren't much connectors in > the wild which I can reimplement is there > Should I look at how the file sink is set up and follow that pattern? > > > Regards > Sam > > On Tue, Feb 7, 2017 at 8:40 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> The JSON log is only used by the file sink (which it doesn't seem like >> you are using). Though, I'm not sure exactly what is going on inside of >> setupGoogle or how tableReferenceSource is used. >> >> Typically you would run df.writeStream.option("path", "/my/path")... and >> then the transaction log would go into /my/path/_spark_metadata. >> >> There is not requirement that a sink uses this kind of a update log. >> This is just how we get better transactional semantics than HDFS is >> providing. If your sink supports transactions natively you should just use >> those instead. We pass a unique ID to the sink method addBatch so that you >> can make sure you don't commit the same transaction more than once. >> >> On Tue, Feb 7, 2017 at 3:29 PM, Sam Elamin <hussam.ela...@gmail.com> >> wrote: >> >>> Hi Micheal >>> >>> If thats the case for the below example, where should i be reading these >>> json log files first? im assuming sometime between df and query? >>> >>> >>> val df = spark >>> .readStream >>> .option("tableReferenceSource",tableName) >>> .load() >>> setUpGoogle(spark.sqlContext) >>> >>> val query = df >>> .writeStream >>> .option("tableReferenceSink",tableName2) >>> .option("checkpointLocation","checkpoint") >>> .start() >>> >>> >>> On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust <mich...@databricks.com >>> > wrote: >>> >>>> Read the JSON log of files that is in `/your/path/_spark_metadata` and >>>> only read files that are present in that log (ignore anything else). >>>> >>>> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hussam.ela...@gmail.com> >>>> wrote: >>>> >>>>> Ah I see ok so probably it's the retry that's causing it >>>>> >>>>> So when you say I'll have to take this into account, how do I best do >>>>> that? My sink will have to know what was that extra file. And i was under >>>>> the impression spark would automagically know this because of the >>>>> checkpoint directory set when you created the writestream >>>>> >>>>> If that's not the case then how would I go about ensuring no >>>>> duplicates? >>>>> >>>>> >>>>> Thanks again for the awesome support! >>>>> >>>>> Regards >>>>> Sam >>>>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mich...@databricks.com> >>>>> wrote: >>>>> >>>>>> Sorry, I think I was a little unclear. There are two things at play >>>>>> here. >>>>>> >>>>>> - Exactly-once semantics with file output: spark writes out extra >>>>>> metadata on which files are valid to ensure that failures don't cause us >>>>>> to >>>>>> "double count" any of the input. Spark 2.0+ detects this info >>>>>> automatically when you use dataframe reader (spark.read...). There may be >>>>>> extra files, but they will be ignored. If you are consuming the output >>>>>> with >>>>>> another system you'll have to take this into account. >>>>>> - Retries: right now we always retry the last batch when >>>>>> restarting. This is safe/correct because of the above, but we could also >>>>>> optimize this away by tracking more information about batch progress. >>>>>> >>>>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hussam.ela...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Hmm ok I understand that but the job is running for a good few mins >>>>>> before I kill it so there should not be any jobs left because I can see >>>>>> in >>>>>> the log that its now polling for new changes, the latest offset is the >>>>>> right one >>>>>> >>>>>> After I kill it and relaunch it picks up that same file? >>>>>> >>>>>> >>>>>> Sorry if I misunderstood you >>>>>> >>>>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust < >>>>>> mich...@databricks.com> wrote: >>>>>> >>>>>> It is always possible that there will be extra jobs from failed >>>>>> batches. However, for the file sink, only one set of files will make it >>>>>> into _spark_metadata directory log. This is how we get atomic commits >>>>>> even >>>>>> when there are files in more than one directory. When reading the files >>>>>> with Spark, we'll detect this directory and use it instead of listStatus >>>>>> to >>>>>> find the list of valid files. >>>>>> >>>>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hussam.ela...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> On another note, when it comes to checkpointing on structured >>>>>> streaming >>>>>> >>>>>> I noticed if I have a stream running off s3 and I kill the process. >>>>>> The next time the process starts running it dulplicates the last record >>>>>> inserted. is that normal? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> So say I have streaming enabled on one folder "test" which only has >>>>>> two files "update1" and "update 2", then I kill the spark job using >>>>>> Ctrl+C. >>>>>> When I rerun the stream it picks up "update 2" again >>>>>> >>>>>> Is this normal? isnt ctrl+c a failure? >>>>>> >>>>>> I would expect checkpointing to know that update 2 was already >>>>>> processed >>>>>> >>>>>> Regards >>>>>> Sam >>>>>> >>>>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hussam.ela...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Thanks Micheal! >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust < >>>>>> mich...@databricks.com> wrote: >>>>>> >>>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497 >>>>>> >>>>>> We should add this soon. >>>>>> >>>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hussam.ela...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Hi All >>>>>> >>>>>> When trying to read a stream off S3 and I try and drop duplicates I >>>>>> get the following error: >>>>>> >>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException: >>>>>> Append output mode not supported when there are streaming aggregations on >>>>>> streaming DataFrames/DataSets;; >>>>>> >>>>>> >>>>>> Whats strange if I use the batch "spark.read.json", it works >>>>>> >>>>>> Can I assume you cant drop duplicates in structured streaming >>>>>> >>>>>> Regards >>>>>> Sam >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>> >>> >> >