Hi Naveen! I assume you are using Hadoop 2.7+? Then you should not see the ".valid-length" file.
The fix you mentioned is part of later Flink releases (like 1.0.3) Stephan On Mon, May 16, 2016 at 11:46 PM, Madhire, Naveen < naveen.madh...@capitalone.com> wrote: > Thanks Fabian. Actually I don’t see a .valid-length suffix file in the > output HDFS folder. > Can you please tell me how would I debug this issue or do you suggest > anything else to solve this duplicates problem. > > > Thank you. > > From: Fabian Hueske <fhue...@gmail.com> > Reply-To: "user@flink.apache.org" <user@flink.apache.org> > Date: Saturday, May 14, 2016 at 4:10 AM > To: "user@flink.apache.org" <user@flink.apache.org> > Subject: Re: Flink recovery > > The behavior of the RollingFileSink depends on the capabilities of the > file system. > If the file system does not support to truncate files such as older HDFS > versions, an additional file with a .valid-length suffix is written to > indicate how much of the file is valid. > All records / data that come after the valid-length are duplicates. > Please refer to the JavaDocs of the RollingFileSink for details [1]. > > If the .valid-length file does not solve the problem, you might have found > a bug and we should have a closer look at the problem. > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html > > 2016-05-14 4:17 GMT+02:00 Madhire, Naveen <naveen.madh...@capitalone.com>: > >> Thanks Fabian. Yes, I am seeing few records more than once in the output. >> I am running the job and canceling it from the dashboard, and running >> again. And using different HDFS file outputs both the times. I was thinking >> when I cancel the job, it’s not doing a clean cancel. >> Is there anything else which I have to use to make it exactly once in the >> output? >> >> I am using a simple read from kafka, transformations and rolling file >> sink pipeline. >> >> >> >> Thanks, >> Naveen >> >> From: Fabian Hueske <fhue...@gmail.com> >> Reply-To: "user@flink.apache.org" <user@flink.apache.org> >> Date: Friday, May 13, 2016 at 4:26 PM >> >> To: "user@flink.apache.org" <user@flink.apache.org> >> Subject: Re: Flink recovery >> >> Hi Naveen, >> >> the RollingFileSink supports exactly-once output. So you should be good. >> >> Did you see events being emitted multiple times (should not happen with >> the RollingFileSink) or being processed multiple times within the Flink >> program (might happen as explained before)? >> >> Best, Fabian >> >> 2016-05-13 23:19 GMT+02:00 Madhire, Naveen <naveen.madh...@capitalone.com >> >: >> >>> Thank you Fabian. >>> >>> I am using HDFS rolling sink. This should support the exactly once >>> output in case of failures, isn’t it? I am following the below >>> documentation, >>> >>> >>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks >>> >>> If not what other Sinks can I use to have the exactly once output since >>> getting exactly once output is critical for our use case. >>> >>> >>> >>> Thanks, >>> Naveen >>> >>> From: Fabian Hueske <fhue...@gmail.com> >>> Reply-To: "user@flink.apache.org" <user@flink.apache.org> >>> Date: Friday, May 13, 2016 at 4:13 PM >>> To: "user@flink.apache.org" <user@flink.apache.org> >>> Subject: Re: Flink recovery >>> >>> Hi, >>> >>> Flink's exactly-once semantics do not mean that events are processed >>> exactly-once but that events will contribute exactly-once to the state of >>> an operator such as a counter. >>> Roughly, the mechanism works as follows: >>> - Flink peridically injects checkpoint markers into the data stream. >>> This happens synchronously across all sources and markers. >>> - When an operator receives a checkpoint marker from all its sources, it >>> checkpoints its state and forwards the marker >>> - When the marker was received by all sinks, the distributed checkpoint >>> is noted as successful. >>> >>> In case of a failure, the state of all operators is reset to the last >>> successful checkpoint and the sources are reset to the point when the >>> marker was injected. >>> Hence, some events are sent a second time to the operators but the state >>> of the operators was reset as well. So the repeated events contribute >>> exactly once to the state of an operator. >>> >>> Note, you need a SinkFunction that supports Flink's checkpointing >>> mechanism to achieve exactly-once output. Otherwise, it might happen that >>> results are emitted multiple times. >>> >>> Cheers, Fabian >>> >>> 2016-05-13 22:58 GMT+02:00 Madhire, Naveen < >>> naveen.madh...@capitalone.com>: >>> >>>> I checked the JIRA and looks like FLINK-2111 should address the issue >>>> which I am facing. I am canceling the job from dashboard. >>>> >>>> I am using kafka source and HDFS rolling sink. >>>> >>>> https://issues.apache.org/jira/browse/FLINK-2111 >>>> >>>> Is this JIRA part of Flink 1.0.0? >>>> >>>> >>>> >>>> Thanks, >>>> Naveen >>>> >>>> From: "Madhire, Venkat Naveen Kumar Reddy" < >>>> naveen.madh...@capitalone.com> >>>> Reply-To: "user@flink.apache.org" <user@flink.apache.org> >>>> Date: Friday, May 13, 2016 at 10:58 AM >>>> To: "user@flink.apache.org" <user@flink.apache.org> >>>> Subject: Flink recovery >>>> >>>> Hi, >>>> >>>> We are trying to test the recovery mechanism of Flink with Kafka and >>>> HDFS sink during failures. >>>> >>>> I’ve killed the job after processing some messages and restarted the >>>> same job again. Some of the messages I am seeing are processed more than >>>> once and not following the exactly once semantics. >>>> >>>> >>>> Also, using the checkpointing mechanism and saving the state >>>> checkpoints into HDFS. >>>> Below is the checkpoint code, >>>> >>>> envStream.enableCheckpointing(11); >>>> envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>>> envStream.getCheckpointConfig().setCheckpointTimeout(60000); >>>> envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4); >>>> >>>> envStream.setStateBackend(new >>>> FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/")); >>>> >>>> >>>> One thing I’ve noticed is lowering the time to checkpointing is >>>> actually lowering the number of messages processed more than once and 11ms >>>> is the lowest I can use. >>>> >>>> Is there anything else I should try to have exactly once message >>>> processing functionality. >>>> >>>> I am using Flink 1.0.0 and kafka 0.8 >>>> >>>> >>>> Thank you. >>>> >>>> ------------------------------ >>>> >>>> The information contained in this e-mail is confidential and/or >>>> proprietary to Capital One and/or its affiliates and may only be used >>>> solely in performance of work or services for Capital One. The information >>>> transmitted herewith is intended only for use by the individual or entity >>>> to which it is addressed. If the reader of this message is not the intended >>>> recipient, you are hereby notified that any review, retransmission, >>>> dissemination, distribution, copying or other use of, or taking of any >>>> action in reliance upon this information is strictly prohibited. If you >>>> have received this communication in error, please contact the sender and >>>> delete the material from your computer. >>>> >>>> ------------------------------ >>>> >>>> The information contained in this e-mail is confidential and/or >>>> proprietary to Capital One and/or its affiliates and may only be used >>>> solely in performance of work or services for Capital One. The information >>>> transmitted herewith is intended only for use by the individual or entity >>>> to which it is addressed. If the reader of this message is not the intended >>>> recipient, you are hereby notified that any review, retransmission, >>>> dissemination, distribution, copying or other use of, or taking of any >>>> action in reliance upon this information is strictly prohibited. If you >>>> have received this communication in error, please contact the sender and >>>> delete the material from your computer. >>>> >>> >>> >>> ------------------------------ >>> >>> The information contained in this e-mail is confidential and/or >>> proprietary to Capital One and/or its affiliates and may only be used >>> solely in performance of work or services for Capital One. The information >>> transmitted herewith is intended only for use by the individual or entity >>> to which it is addressed. If the reader of this message is not the intended >>> recipient, you are hereby notified that any review, retransmission, >>> dissemination, distribution, copying or other use of, or taking of any >>> action in reliance upon this information is strictly prohibited. If you >>> have received this communication in error, please contact the sender and >>> delete the material from your computer. >>> >> >> >> ------------------------------ >> >> The information contained in this e-mail is confidential and/or >> proprietary to Capital One and/or its affiliates and may only be used >> solely in performance of work or services for Capital One. The information >> transmitted herewith is intended only for use by the individual or entity >> to which it is addressed. If the reader of this message is not the intended >> recipient, you are hereby notified that any review, retransmission, >> dissemination, distribution, copying or other use of, or taking of any >> action in reliance upon this information is strictly prohibited. If you >> have received this communication in error, please contact the sender and >> delete the material from your computer. >> > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >