Hi Micheal

If thats the case for the below example, where should i be reading these
json log files first? im assuming sometime between df and query?


val df = spark
    .readStream
    .option("tableReferenceSource",tableName)
    .load()
setUpGoogle(spark.sqlContext)

val query = df
  .writeStream
  .option("tableReferenceSink",tableName2)
  .option("checkpointLocation","checkpoint")
  .start()


On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Read the JSON log of files that is in `/your/path/_spark_metadata` and
> only read files that are present in that log (ignore anything else).
>
> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> Ah I see ok so probably it's the retry that's causing it
>>
>> So when you say I'll have to take this into account, how do I best do
>> that? My sink will have to know what was that extra file. And i was under
>> the impression spark would automagically know this because of the
>> checkpoint directory set when you created the writestream
>>
>> If that's not the case then how would I go about ensuring no duplicates?
>>
>>
>> Thanks again for the awesome support!
>>
>> Regards
>> Sam
>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> Sorry, I think I was a little unclear.  There are two things at play
>>> here.
>>>
>>>  - Exactly-once semantics with file output: spark writes out extra
>>> metadata on which files are valid to ensure that failures don't cause us to
>>> "double count" any of the input.  Spark 2.0+ detects this info
>>> automatically when you use dataframe reader (spark.read...). There may be
>>> extra files, but they will be ignored. If you are consuming the output with
>>> another system you'll have to take this into account.
>>>  - Retries: right now we always retry the last batch when restarting.
>>> This is safe/correct because of the above, but we could also optimize this
>>> away by tracking more information about batch progress.
>>>
>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> Hmm ok I understand that but the job is running for a good few mins
>>> before I kill it so there should not be any jobs left because I can see in
>>> the log that its now polling for new changes, the latest offset is the
>>> right one
>>>
>>> After I kill it and relaunch it picks up that same file?
>>>
>>>
>>> Sorry if I misunderstood you
>>>
>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <mich...@databricks.com
>>> > wrote:
>>>
>>> It is always possible that there will be extra jobs from failed batches.
>>> However, for the file sink, only one set of files will make it into
>>> _spark_metadata directory log.  This is how we get atomic commits even when
>>> there are files in more than one directory.  When reading the files with
>>> Spark, we'll detect this directory and use it instead of listStatus to find
>>> the list of valid files.
>>>
>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> On another note, when it comes to checkpointing on structured streaming
>>>
>>> I noticed if I have  a stream running off s3 and I kill the process. The
>>> next time the process starts running it dulplicates the last record
>>> inserted. is that normal?
>>>
>>>
>>>
>>>
>>> So say I have streaming enabled on one folder "test" which only has two
>>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>>> When I rerun the stream it picks up "update 2" again
>>>
>>> Is this normal? isnt ctrl+c a failure?
>>>
>>> I would expect checkpointing to know that update 2 was already processed
>>>
>>> Regards
>>> Sam
>>>
>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> Thanks Micheal!
>>>
>>>
>>>
>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <mich...@databricks.com
>>> > wrote:
>>>
>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>
>>> We should add this soon.
>>>
>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> Hi All
>>>
>>> When trying to read a stream off S3 and I try and drop duplicates I get
>>> the following error:
>>>
>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>> Append output mode not supported when there are streaming aggregations on
>>> streaming DataFrames/DataSets;;
>>>
>>>
>>> Whats strange if I use the batch "spark.read.json", it works
>>>
>>> Can I assume you cant drop duplicates in structured streaming
>>>
>>> Regards
>>> Sam
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>

Reply via email to