[GitHub] [spark] wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming

2019-09-12 Thread GitBox
wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS 
sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#issuecomment-530713693
 
 
   @gaborgsomogyi @HeartSaVioR Thanks for your reply.
   About transaction timeout, as described above and in the [mail 
list](http://apache-spark-developers-list.1001551.n3.nabble.com/SS-Kafka-EOS-transaction-timeout-solution-td27797.html),
  I think at present the only way to address the issue is: 
   1. lost data when transaction timeout.
   2. resend data when we find transaction timeout.
   3. set `transactional.timeout.ms` to a larger value, see `Integer.MAX_VALUE`.
   
   The first way is rejected since it leads to data loss. The second one is 
rejected since it is not consistent with exactly-once semantics. 
   The last one seems workable. Because commit transaction is lightweight 
action, If transaction is not recovered after `Integer.MAX_VALUE` ms,  we can 
say it is caused by Kafka cluster fatal error, and transaction can`t be 
recovered. At this situation, we can provider a way to resend data when user 
want to.
   
   @gaborgsomogyi About a new Kafka API to resolve Kafka transaction in 
distributed system, as @HeartSaVioR mentioned above,  producer transaction is 
not provided only for kafka stream, and I also think we should adapt it to 
Spark/Flink/Hive.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming

2019-08-31 Thread GitBox
wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS 
sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#issuecomment-526809684
 
 
   @HeartSaVioR @gaborgsomogyi Thanks for your advice. I have created 
discussion in 
[mail-list](http://apache-spark-developers-list.1001551.n3.nabble.com/SS-Kafka-EOS-transaction-timeout-solution-td27797.html)
 and looking forward to you guys.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming

2019-08-30 Thread GitBox
wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS 
sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#issuecomment-526481232
 
 
   > Spark doesn't have semantics of 2PC natively as you've seen DSv2 API - If 
I understand correctly, Spark HDFS sink doesn't leverage 2PC.
   > 
   > Previously it used temporal directory - let all tasks write to that 
directory, and driver move that directory to final destination only when all 
tasks succeed to write. It leverages the fact that "rename" is atomic, so it 
didn't support "exactly-once" if underlying filesystem doesn't support atomic 
renaming.
   > 
   > Now it leverages metadata - let all tasks write files, and pass the list 
of files (path) written to driver. When driver receives all list of written 
files from all tasks, driver writes overall list of files to metadata. So 
exactly-once for HDFS is only guaranteed when "Spark" reads the output which is 
aware of metadata information.
   
   Sorry for late reply.
   In my understand that is the procedure of 2PC.
   The voting phase every task write data and return commit message to driver. 
In the commit phase, when all tasks completed successfully, the driver commit 
job with rename, or abort job if any task failed to commit or job commit failed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming

2019-08-29 Thread GitBox
wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS 
sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#issuecomment-526422493
 
 
   > Before reviewing the design, I may need to say, you are encouraged to at 
least mention it if you borrow the code from somewhere, so that we are sure 
that there's no license issue, even no license issue, at least they've got 
credit.
   > 
   > 
https://github.com/apache/spark/pull/25618/files#diff-c1e1dbc4a986c69ef54e1eebe880d4e9
   > 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
   
   @HeartSaVioR Thanks for your advice. 
   Indeed I refer to the flink FlinkKafkaProducer.java to resume transaction 
since there is no kafka API to support this function. And I will add it to 
annotation and design sketch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming

2019-08-29 Thread GitBox
wenxuanguan commented on issue #25618: [SPARK-28908][SS]Implement Kafka EOS 
sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#issuecomment-526421834
 
 
   > By reading the doc without super deep understanding I've found this in the 
caveats section:
   > 
   > ```
   > If job failed before ResumeTransaction more than 60 seconds, the default 
value
   > ofconfiguration transaction.timeout.ms, data send to Kafka cluster will be 
discarded
   > and lead todata loss.So We set transaction.timeout.ms to 90, the 
default
   > value of max.transaction.timeout.msin Kafka cluster, to reduce the risk of 
data loss
   > if user not defined
   > ```
   > 
   > The `to reduce the risk of data loss` part disturbs me a bit, is it 
exactly-once then or not?
   > @HeartSaVioR AFAIK you've proposed exactly once SPIP before but there were 
concerns.
   
   @gaborgsomogyi @HeartSaVioR Thanks for your reply about the config 
`transaction.timeout.ms` and the data loss.
   The common scene occurred is that as producer failed to commit transaction 
for some reason, such as kafka broker down, spark job will fail down. After 
kafka broker recovered, restart the job and transaction will resume. So if the 
time between transaction commit failure fixed and job restart by job attempt or 
manually not exceed `transaction.timeout.ms`, no data will be lost.
   The default config `transaction.timeout.ms` in producer 60 seconds, so to 
make sure there is enough time for fix failure we reset it to 90, the 
default value of kafka broker config, if user not defined. Because the request 
will fail if the producer config `transaction.timeout.ms' is larger than the 
kafka broker config.
   I think it is what we can do in code, and also notice user in document. 
There is also some solution to avoid this, such as increase config 
`transaction.timeout.ms`, and it is depend on user. So if user defined 
`transaction.timeout.ms`, we just check if it is larger enough.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org