Ignore me, a bit more digging and I was able to find the filesink source
<https://github.com/apache/spark/blob/1ae4652b7e1f77a984b8459c778cb06c814192c5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala>

Following that pattern worked a treat!

Thanks again Micheal :)

On Tue, Feb 7, 2017 at 8:44 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Sorry those are methods I wrote so you can ignore them :)
>
> so just adding a path parameter tells spark thats where the update log is?
>
> Do I check for the unique id there and identify which batch was written
> and which weren't
>
> Are there any examples of this out there? there aren't much connectors in
> the wild which I can reimplement is there
> Should I look at how the file sink is set up and follow that pattern?
>
>
> Regards
> Sam
>
> On Tue, Feb 7, 2017 at 8:40 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> The JSON log is only used by the file sink (which it doesn't seem like
>> you are using).  Though, I'm not sure exactly what is going on inside of
>> setupGoogle or how tableReferenceSource is used.
>>
>> Typically you would run df.writeStream.option("path", "/my/path")... and
>> then the transaction log would go into /my/path/_spark_metadata.
>>
>> There is not requirement that a sink uses this kind of a update log.
>> This is just how we get better transactional semantics than HDFS is
>> providing.  If your sink supports transactions natively you should just use
>> those instead.  We pass a unique ID to the sink method addBatch so that you
>> can make sure you don't commit the same transaction more than once.
>>
>> On Tue, Feb 7, 2017 at 3:29 PM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>>> Hi Micheal
>>>
>>> If thats the case for the below example, where should i be reading these
>>> json log files first? im assuming sometime between df and query?
>>>
>>>
>>> val df = spark
>>>     .readStream
>>>     .option("tableReferenceSource",tableName)
>>>     .load()
>>> setUpGoogle(spark.sqlContext)
>>>
>>> val query = df
>>>   .writeStream
>>>   .option("tableReferenceSink",tableName2)
>>>   .option("checkpointLocation","checkpoint")
>>>   .start()
>>>
>>>
>>> On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust <mich...@databricks.com
>>> > wrote:
>>>
>>>> Read the JSON log of files that is in `/your/path/_spark_metadata` and
>>>> only read files that are present in that log (ignore anything else).
>>>>
>>>> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hussam.ela...@gmail.com>
>>>> wrote:
>>>>
>>>>> Ah I see ok so probably it's the retry that's causing it
>>>>>
>>>>> So when you say I'll have to take this into account, how do I best do
>>>>> that? My sink will have to know what was that extra file. And i was under
>>>>> the impression spark would automagically know this because of the
>>>>> checkpoint directory set when you created the writestream
>>>>>
>>>>> If that's not the case then how would I go about ensuring no
>>>>> duplicates?
>>>>>
>>>>>
>>>>> Thanks again for the awesome support!
>>>>>
>>>>> Regards
>>>>> Sam
>>>>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mich...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry, I think I was a little unclear.  There are two things at play
>>>>>> here.
>>>>>>
>>>>>>  - Exactly-once semantics with file output: spark writes out extra
>>>>>> metadata on which files are valid to ensure that failures don't cause us 
>>>>>> to
>>>>>> "double count" any of the input.  Spark 2.0+ detects this info
>>>>>> automatically when you use dataframe reader (spark.read...). There may be
>>>>>> extra files, but they will be ignored. If you are consuming the output 
>>>>>> with
>>>>>> another system you'll have to take this into account.
>>>>>>  - Retries: right now we always retry the last batch when
>>>>>> restarting.  This is safe/correct because of the above, but we could also
>>>>>> optimize this away by tracking more information about batch progress.
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hussam.ela...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hmm ok I understand that but the job is running for a good few mins
>>>>>> before I kill it so there should not be any jobs left because I can see 
>>>>>> in
>>>>>> the log that its now polling for new changes, the latest offset is the
>>>>>> right one
>>>>>>
>>>>>> After I kill it and relaunch it picks up that same file?
>>>>>>
>>>>>>
>>>>>> Sorry if I misunderstood you
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <
>>>>>> mich...@databricks.com> wrote:
>>>>>>
>>>>>> It is always possible that there will be extra jobs from failed
>>>>>> batches. However, for the file sink, only one set of files will make it
>>>>>> into _spark_metadata directory log.  This is how we get atomic commits 
>>>>>> even
>>>>>> when there are files in more than one directory.  When reading the files
>>>>>> with Spark, we'll detect this directory and use it instead of listStatus 
>>>>>> to
>>>>>> find the list of valid files.
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hussam.ela...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> On another note, when it comes to checkpointing on structured
>>>>>> streaming
>>>>>>
>>>>>> I noticed if I have  a stream running off s3 and I kill the process.
>>>>>> The next time the process starts running it dulplicates the last record
>>>>>> inserted. is that normal?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> So say I have streaming enabled on one folder "test" which only has
>>>>>> two files "update1" and "update 2", then I kill the spark job using 
>>>>>> Ctrl+C.
>>>>>> When I rerun the stream it picks up "update 2" again
>>>>>>
>>>>>> Is this normal? isnt ctrl+c a failure?
>>>>>>
>>>>>> I would expect checkpointing to know that update 2 was already
>>>>>> processed
>>>>>>
>>>>>> Regards
>>>>>> Sam
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hussam.ela...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks Micheal!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <
>>>>>> mich...@databricks.com> wrote:
>>>>>>
>>>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>>>>
>>>>>> We should add this soon.
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hussam.ela...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi All
>>>>>>
>>>>>> When trying to read a stream off S3 and I try and drop duplicates I
>>>>>> get the following error:
>>>>>>
>>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>>>> Append output mode not supported when there are streaming aggregations on
>>>>>> streaming DataFrames/DataSets;;
>>>>>>
>>>>>>
>>>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>>>
>>>>>> Can I assume you cant drop duplicates in structured streaming
>>>>>>
>>>>>> Regards
>>>>>> Sam
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to