Hi Micheal If thats the case for the below example, where should i be reading these json log files first? im assuming sometime between df and query?
val df = spark .readStream .option("tableReferenceSource",tableName) .load() setUpGoogle(spark.sqlContext) val query = df .writeStream .option("tableReferenceSink",tableName2) .option("checkpointLocation","checkpoint") .start() On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust <mich...@databricks.com> wrote: > Read the JSON log of files that is in `/your/path/_spark_metadata` and > only read files that are present in that log (ignore anything else). > > On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > >> Ah I see ok so probably it's the retry that's causing it >> >> So when you say I'll have to take this into account, how do I best do >> that? My sink will have to know what was that extra file. And i was under >> the impression spark would automagically know this because of the >> checkpoint directory set when you created the writestream >> >> If that's not the case then how would I go about ensuring no duplicates? >> >> >> Thanks again for the awesome support! >> >> Regards >> Sam >> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> Sorry, I think I was a little unclear. There are two things at play >>> here. >>> >>> - Exactly-once semantics with file output: spark writes out extra >>> metadata on which files are valid to ensure that failures don't cause us to >>> "double count" any of the input. Spark 2.0+ detects this info >>> automatically when you use dataframe reader (spark.read...). There may be >>> extra files, but they will be ignored. If you are consuming the output with >>> another system you'll have to take this into account. >>> - Retries: right now we always retry the last batch when restarting. >>> This is safe/correct because of the above, but we could also optimize this >>> away by tracking more information about batch progress. >>> >>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>> Hmm ok I understand that but the job is running for a good few mins >>> before I kill it so there should not be any jobs left because I can see in >>> the log that its now polling for new changes, the latest offset is the >>> right one >>> >>> After I kill it and relaunch it picks up that same file? >>> >>> >>> Sorry if I misunderstood you >>> >>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <mich...@databricks.com >>> > wrote: >>> >>> It is always possible that there will be extra jobs from failed batches. >>> However, for the file sink, only one set of files will make it into >>> _spark_metadata directory log. This is how we get atomic commits even when >>> there are files in more than one directory. When reading the files with >>> Spark, we'll detect this directory and use it instead of listStatus to find >>> the list of valid files. >>> >>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>> On another note, when it comes to checkpointing on structured streaming >>> >>> I noticed if I have a stream running off s3 and I kill the process. The >>> next time the process starts running it dulplicates the last record >>> inserted. is that normal? >>> >>> >>> >>> >>> So say I have streaming enabled on one folder "test" which only has two >>> files "update1" and "update 2", then I kill the spark job using Ctrl+C. >>> When I rerun the stream it picks up "update 2" again >>> >>> Is this normal? isnt ctrl+c a failure? >>> >>> I would expect checkpointing to know that update 2 was already processed >>> >>> Regards >>> Sam >>> >>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>> Thanks Micheal! >>> >>> >>> >>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <mich...@databricks.com >>> > wrote: >>> >>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497 >>> >>> We should add this soon. >>> >>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>> Hi All >>> >>> When trying to read a stream off S3 and I try and drop duplicates I get >>> the following error: >>> >>> Exception in thread "main" org.apache.spark.sql.AnalysisException: >>> Append output mode not supported when there are streaming aggregations on >>> streaming DataFrames/DataSets;; >>> >>> >>> Whats strange if I use the batch "spark.read.json", it works >>> >>> Can I assume you cant drop duplicates in structured streaming >>> >>> Regards >>> Sam >>> >>> >>> >>> >>> >>> >>> >>> >