Hi, Sagar.

> 1. It solves the issue partially meaning files which have finished 
> checkpointing don't show .pending status but the files which were in progress 
>     when the program exists are still in .pending state.

Ans: 

Yea, Make the program exists and in that time if a checkpoint does not finished 
will lead the status keeps in .pending state then. Under the normal 
circumstances, the programs that running in the production env will never be 
stoped or existed if everything is fine.

> 2. Ideally, writer should work with default settings correct ? Meaning we 
> don't have to explicitly set these parameters to make it work. 
>     Is this assumption correct ?

Ans: 

Yes. Writer should work with default settings correct.
Yes. We do not have to explicitly set these parameters to make it work.
Yes. Assumption correct indeed.

However, you know, flink is a real time streaming framework, so under normal 
circumstances,you don't really go to use the default settings when it comes to 
a specific business. Especially together work with offline end(Like hadoop 
mapreduce). In this case, you need to tell the offline end when time a bucket 
is close and when time the data for the specify bucket is ready. So, you can 
take a look on https://issues.apache.org/jira/browse/FLINK-9609 
<https://issues.apache.org/jira/browse/FLINK-9609>.

Cheers
Zhangminglei


> 在 2018年6月23日,上午8:23,sagar loke <sagar...@gmail.com> 写道:
> 
> Hi Zhangminglei,
> 
> Thanks for the reply.
> 
> 1. It solves the issue partially meaning files which have finished 
> checkpointing don't show .pending status but the files which were in progress 
>     when the program exists are still in .pending state.
> 
> 2. Ideally, writer should work with default settings correct ? Meaning we 
> don't have to explicitly set these parameters to make it work. 
>     Is this assumption correct ?
> 
> Thanks,
> Sagar
> 
> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com 
> <mailto:18717838...@163.com>> wrote:
> Hi, Sagar. Please use the below code and you will find the part files status 
> from _part-0-107.in-progress   <> to _part-0-107.pending and finally to 
> part-0-107. [For example], you need to run the program for a while. However, 
> we need set some parameters, like the following. Moreover, 
> enableCheckpointing IS also needed. I know why you always see the .pending 
> file since the below parameters default value is 60 seconds even though you 
> set the enableCheckpoint. So, that is why you can not see the finished file 
> status until 60 seconds passed.
> 
> Attached is the ending on my end, and you will see what you want! 
> 
> Please let me know if you still have the problem.
> 
> Cheers
> Zhangminglei
> 
> setInactiveBucketCheckInterval(2000)
> .setInactiveBucketThreshold(2000);
> 
> public class TestOrc {
>    public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       env.setParallelism(1);
>       env.enableCheckpointing(1000);
>       env.setStateBackend(new MemoryStateBackend());
> 
>       String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>       String path = "hdfs://10.199.196.0:9000/data/hive/man <>";
> 
>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
> 
>       bucketingSink
>          .setWriter(new OrcFileWriter<>(orcSchemaString))
>          .setInactiveBucketCheckInterval(2000)
>          .setInactiveBucketThreshold(2000);
> 
>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
> 
>       dataStream.addSink(bucketingSink);
> 
>       env.execute();
>    }
> 
>    public static class ManGenerator implements SourceFunction<Row> {
> 
>       @Override
>       public void run(SourceContext<Row> ctx) throws Exception {
>          for (int i = 0; i < 2147483000; i++) {
>             Row row = new Row(3);
>             row.setField(0, "Sagar");
>             row.setField(1, 26 + i);
>             row.setField(2, false);
>             ctx.collect(row);
>          }
>       }
> 
>       @Override
>       public void cancel() {
> 
>       }
>    }
> }
> <filestatus.jpg>
> 
> 
> 
>> 在 2018年6月22日,上午11:14,sagar loke <sagar...@gmail.com 
>> <mailto:sagar...@gmail.com>> 写道:
>> 
>> Sure, we can solve it together :)
>> 
>> Are you able to reproduce it ?
>> 
>> Thanks,
>> Sagar
>> 
>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com 
>> <mailto:18717838...@163.com>> wrote:
>> Sagar, flush will be called when do a checkpoint. Please see 
>> 
>> bucketState.currentFileValidLength = bucketState.writer.flush();
>> 
>> 
>> @Override
>> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not 
>> been properly initialized.");
>> 
>>    restoredBucketStates.clear();
>> 
>>    synchronized (state.bucketStates) {
>>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>> 
>>       for (Map.Entry<String, BucketState<T>> bucketStateEntry : 
>> state.bucketStates.entrySet()) {
>>          BucketState<T> bucketState = bucketStateEntry.getValue();
>> 
>>          if (bucketState.isWriterOpen) {
>>             bucketState.currentFileValidLength = bucketState.writer.flush();
>>          }
>> 
>>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>>             
>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
>> bucketState.pendingFiles);
>>          }
>>          bucketState.pendingFiles = new ArrayList<>();
>>       }
>>       restoredBucketStates.add(state);
>> 
>>       if (LOG.isDebugEnabled()) {
>>          LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), 
>> subtaskIdx, state);
>>       }
>>    }
>> 
>> 
>>> 在 2018年6月22日,上午10:21,sagar loke <sagar...@gmail.com 
>>> <mailto:sagar...@gmail.com>> 写道:
>>> 
>>> Thanks for replying. 
>>> 
>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 
>>> 
>>> env.enablecheckpointing(100);
>>> 
>>> But in all the cases, I still see .pending state. 
>>> 
>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which might 
>>> not be getting called somehow ?
>>> 
>>> Thanks,
>>> Sagar
>>> 
>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838...@163.com 
>>> <mailto:18717838...@163.com>> wrote:
>>> Hi,Sagar
>>> 
>>> Please take a look at BucketingSink, It says that a file would keep 
>>> .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint 
>>> is successful the currently pending file will be removed to {@code 
>>> finished}. 
>>> Take a try again. I think you should call the below method and see what 
>>> would happen on it. Anyway, I will also try that and see whether it works. 
>>> Please let me know if you still meet error.
>>> 
>>>  env.enableCheckpointing(200);
>>> 
>>> /**
>>>  * The suffix for {@code pending} part files. These are closed files that 
>>> we are
>>>  * not currently writing to (inactive or reached {@link #batchSize}), but 
>>> which
>>>  * were not yet confirmed by a checkpoint.
>>>  */
>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending";
>>> <p>Part files can be in one of three states: {@code in-progress}, {@code 
>>> pending} or {@code finished}.
>>> * The reason for this is how the sink works together with the checkpointing 
>>> mechanism to provide exactly-once
>>> * semantics and fault-tolerance. The part file that is currently being 
>>> written to is {@code in-progress}. Once
>>> * a part file is closed for writing it becomes {@code pending}. When a 
>>> checkpoint is successful the currently
>>> * pending files will be moved to {@code finished}.
>>> 
>>> Cheers
>>> Zhangminglei
>>> 
>>> 
>>> 
>>>> 在 2018年6月22日,上午4:46,sagar loke <sagar...@gmail.com 
>>>> <mailto:sagar...@gmail.com>> 写道:
>>>> 
>>>> Thanks Zhangminglei for quick response.
>>>> 
>>>> I tried the above code and I am seeing another issue where the files 
>>>> created on hdfs are always in .pending state.
>>>> 
>>>> Let me know if you can reproduce it ?
>>>> 
>>>> Thanks,
>>>> Sagar
>>>> 
>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838...@163.com 
>>>> <mailto:18717838...@163.com>> wrote:
>>>> Hi, Sagar
>>>> 
>>>> I did a local test for that and it seems works fine for me. PR will be 
>>>> updated for [FLINK-9407] 
>>>> 
>>>> I will update the newest code to PR soon and below is the example I was 
>>>> using for my test. You can check it again. Hopes you can enjoy it!
>>>> 
>>>> Cheers
>>>> Zhangminglei.
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
>>>> import org.apache.flink.types.Row;
>>>> 
>>>> public class TestOrc {
>>>>    public static void main(String[] args) throws Exception {
>>>>       StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>       env.setParallelism(1);
>>>> 
>>>>       String orcSchemaString = 
>>>> "struct<name:string,age:int,married:boolean>";
>>>>       String path = "hdfs://10.199.196.0:9000/data/hive/man <>";
>>>> 
>>>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>> 
>>>>       bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));
>>>> 
>>>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>> 
>>>>       dataStream.addSink(bucketingSink);
>>>> 
>>>>       env.execute();
>>>>    }
>>>> 
>>>>    public static class ManGenerator implements SourceFunction<Row> {
>>>> 
>>>>       @Override
>>>>       public void run(SourceContext<Row> ctx) throws Exception {
>>>>          for (int i = 0; i < 3; i++) {
>>>>             Row row = new Row(3);
>>>>             row.setField(0, "Sagar");
>>>>             row.setField(1, 26 + i);
>>>>             row.setField(2, false);
>>>>             ctx.collect(row);
>>>>          }
>>>>       }
>>>> 
>>>>       @Override
>>>>       public void cancel() {
>>>> 
>>>>       }
>>>>    }
>>>> }
>>>> 
>>>> 
>>>>> 在 2018年6月21日,上午1:47,sagar loke <sagar...@gmail.com 
>>>>> <mailto:sagar...@gmail.com>> 写道:
>>>>> 
>>>>> Hi Zhangminglei,
>>>>> 
>>>>> Question about  https://issues.apache.org/jira/browse/FLINK-9407 
>>>>> <https://issues.apache.org/jira/browse/FLINK-9407> 
>>>>> 
>>>>> I tried to use the code from PR and run it on local hdfs cluster to write 
>>>>> some ORC data.
>>>>> 
>>>>> But somehow this code is failing with following error:
>>>>> 
>>>>>  
>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>>>>>  Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in 
>>>>> <http://part-0-0.in/>-progress for DFSClient_NONMAPREDUCE_73219864_36 on 
>>>>> 127.0.0.1 because this file lease is currently owned by 
>>>>> DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1
>>>>>   at 
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500)
>>>>> 
>>>>> I understand that this error is related to Hadoop but somehow I get this 
>>>>> error only when executing the code from this PR.
>>>>> 
>>>>> I had created very crude way to write ORC file to HDFS as per follows. 
>>>>> Below code works alright and does not throw above error.
>>>>> 
>>>>> import org.apache.flink.streaming.connectors.fs.Writer;
>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>> import org.apache.hadoop.fs.Path;
>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
>>>>> import org.apache.orc.OrcFile;
>>>>> import org.apache.orc.TypeDescription;
>>>>> import org.apache.hadoop.conf.Configuration;
>>>>> 
>>>>> import java.io.IOException;
>>>>> 
>>>>> public class FlinkOrcWriterV1<T> implements 
>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> {
>>>>> 
>>>>>     private transient org.apache.orc.Writer orcWriter;
>>>>>     String schema;
>>>>>     TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
>>>>>     String basePath;
>>>>> 
>>>>>     public FlinkOrcWriterV1(String schema) {
>>>>>         this.schema = schema;
>>>>>         this.typeDescriptionschema = TypeDescription.fromString(schema);
>>>>>     }
>>>>> 
>>>>>     @Override
>>>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>>>         Configuration conf = new Configuration();
>>>>>         orcWriter = OrcFile.createWriter(new 
>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/ <>"),
>>>>>                     OrcFile.writerOptions(conf)
>>>>>                         .setSchema(typeDescriptionschema));
>>>>>     }
>>>>> 
>>>>>     @Override
>>>>>     public long flush() throws IOException {
>>>>>         return orcWriter.writeIntermediateFooter();
>>>>>     }
>>>>> 
>>>>>     @Override
>>>>>     public long getPos() throws IOException {
>>>>>         return orcWriter.getRawDataSize();
>>>>>     }
>>>>> 
>>>>>     @Override
>>>>>     public void close() throws IOException {
>>>>>         orcWriter.close();
>>>>>     }
>>>>> 
>>>>>     @Override
>>>>>     public void write(T element) throws IOException {
>>>>>         VectorizedRowBatch batch = 
>>>>> typeDescriptionschema.createRowBatch(10);
>>>>>         LongColumnVector x = (LongColumnVector) batch.cols[0];
>>>>>         LongColumnVector y = (LongColumnVector) batch.cols[1];
>>>>>         for(int r=0; r < 10; ++r) {
>>>>>             int row = batch.size++;
>>>>>             x.vector[row] = r;
>>>>>             y.vector[row] = r * 3;
>>>>>             // If the batch is full, write it out and start over.
>>>>>             if (batch.size == batch.getMaxSize()) {
>>>>>                 orcWriter.addRowBatch(batch);
>>>>>                 batch.reset();
>>>>>             }
>>>>>         }
>>>>>         if (batch.size != 0) {
>>>>>             orcWriter.addRowBatch(batch);
>>>>>             batch.reset();
>>>>>         }
>>>>>     }
>>>>> 
>>>>>     @Override
>>>>>     public FlinkOrcWriterV1<T> duplicate() {
>>>>>         return new FlinkOrcWriterV1<>(schema);
>>>>>     }
>>>>> }
>>>>> 
>>>>> 
>>>>> 
>>>>> Not sure, if the error is related to any of the hadoop dependencies or 
>>>>> something else ?
>>>>> 
>>>>> Can you please look into it and let me know if you can reproduce it on 
>>>>> your end too ?
>>>>> 
>>>>> By the way, following are my dependencies in my project: 
>>>>> 
>>>>> dependencies {
>>>>>     compile 'org.apache.flink:flink-java:1.4.2'
>>>>>     compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
>>>>>     compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
>>>>>     compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
>>>>>     compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
>>>>>     compile 'io.confluent:kafka-avro-serializer:3.3.0'
>>>>>     compile 'org.apache.flink:flink-avro:1.4.2'
>>>>>     compile group: 'org.apache.kafka', name: 'kafka_2.11', version: 
>>>>> '1.1.0'
>>>>>     compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', 
>>>>> version: '1.4.2'
>>>>>     compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
>>>>>     compile group: 'org.apache.flink', name: 'flink-jdbc', version: 
>>>>> '1.4.2'
>>>>>     compile group: 'org.apache.flink', name: 'flink-table_2.11', version: 
>>>>> '1.4.2'
>>>>>     compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
>>>>>     compile group: 'org.apache.parquet', name: 'parquet-avro', version: 
>>>>> '1.10.0'
>>>>>     compile group: 'org.apache.parquet', name: 'parquet-common', version: 
>>>>> '1.10.0'
>>>>>     compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: 
>>>>> '1.4.2'
>>>>>     testCompile group: 'junit', name: 'junit', version: '4.12'
>>>>> }
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Thanks,
>>>>> Sagar.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Regards,
>>>> SAGAR.
>>> 
>>> -- 
>>> Cheers,
>>> Sagar
>> 
>> -- 
>> Cheers,
>> Sagar
> 
> 
> 
> 
> -- 
> Regards,
> SAGAR.

Reply via email to