Re: Simple stdout sink for testing Table API?

2018-06-23 Thread Hequn Cheng
Hi chrisr,

It seems there are no "single line" ways to solve your problem. To print
results on screen, you can use the DataStream.print() / DataSet.print()
method, and to limit the output you can add a FilterFunction. The code
looks like:

Table projection1 = customers
> .select("id,last_name")
> .filter("last_name !== 'foobar'");




> tableEnv.toAppendStream(projection1, Row.class).filter(new
> FilterFunction() {
>   int count = 0;
>   @Override
>   public boolean filter(Row value) throws Exception {
> return count ++ < 10;
>   }
> }).setParallelism(1).print();


Hope this helps,
Hequn

On Sun, Jun 24, 2018 at 7:05 AM, chrisr123  wrote:

> Is there a simple way to output the first few rows of a Flink table to
> stdout
> when developing an application?  I just want to see the first 10-20 rows on
> screen
> during development to make sure my logic is correct.
> There doesnt seem to be something like print(10) in the API to see the
> first
> n rows
> Here is simple sample program, but I am writing to a CSV table sink for
> testing right now.
>
> // Get Customers
> String customersPath = "input/customers.csv";
> // id,first_name,last_name,email,address,city,state,zip
> CsvTableSource customersTableSource = CsvTableSource.builder()
> .path(customersPath)
> .ignoreFirstLine()
> .fieldDelimiter(",")
> .field("id", Types.INT())
> .field("first_name", Types.STRING())
> .field("last_name", Types.STRING())
> .field("email", Types.STRING())
> .field("address", Types.STRING())
> .field("city", Types.STRING())
> .field("state", Types.STRING())
> .field("zip", Types.STRING())
> .build();
>
>
> // Register our table source
> tableEnv.registerTableSource("customers", customersTableSource);
> Table customers = tableEnv.scan("customers");
>
>
> // Perform Operations
> // SELECT id,last_name
> // FROM customers
> Table projection1 = customers
> .select("id,last_name")
> .filter("last_name !== 'foobar'");
>
>
> // Write to Sinks
> int parallelism = 1;
> TableSink sink = new CsvTableSink("output/customers_out.csv", ",",
> parallelism, WriteMode.OVERWRITE);
> projection1.writeToSink(sink);
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-06-23 Thread zhangminglei
Hi, Rinat

I tried this situation you said and it works fine for me. The partCounter 
incremented as we hope. When the new part file is created, I did not see any 
same part index. Here is my code for that, you can take a look.
In my case, the max index of part file is part-0-683PartSuffix, other than 
that, all still keep in _part-0-684PartSuffix.pending,  
_part-0-685PartSuffix.pending and so on since checkpoint does not finished.

Cheers
Minglei.

public class TestSuffix {
   public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  String outputPath = params.getRequired("outputPath");

  StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

  sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
  sEnv.enableCheckpointing(200);
  sEnv.setParallelism(1);

  BucketingSink> sink =
 new BucketingSink>(outputPath)
.setInactiveBucketThreshold(1000)
.setInactiveBucketCheckInterval(1000)
.setPartSuffix("PartSuffix")
.setBatchSize(500);

  sEnv.addSource(new DataGenerator())
 .keyBy(0)
 .map(new CountUpRichMap())
 .addSink(sink);

  sEnv.execute();
   }

   public static class CountUpRichMap extends RichMapFunction, Tuple4> {

  private ValueState counter;

  @Override
  public void open(Configuration parameters) throws Exception {
 counter = getRuntimeContext().getState(new 
ValueStateDescriptor<>("counter", Types.INT));
  }

  @Override
  public Tuple4 map(Tuple3 value) throws Exception {
 Integer counterValue = counter.value();
 if (counterValue == null) {
counterValue = 0;
 }
 counter.update(counterValue + 1);
 return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
  }
   }

   public static class DataGenerator implements SourceFunction> {

  public DataGenerator() {
  }

  @Override
  public void run(SourceContext> ctx) 
throws Exception {
 for (int i = 0; i < 1; i++) {
ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some 
payloads.."));
 }
  }

  @Override
  public void cancel() {

  }
   }
}




> 在 2018年6月16日,下午10:21,Rinat  写道:
> 
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
> 
> During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
> So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
> 
> This problem is related with the following code:
> Here we are trying to find the max index of part file, that doesn’t exist in 
> bucket directory, the problem is, that the partSuffix is not involved into 
> path assembly. This means, that path always doesn’t exist
> and partCounter wouldn’t be ever incremented.
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>   fs.exists(getPendingPathFor(partPath)) ||
>   fs.exists(getInProgressPathFor(partPath))) {
>bucketState.partCounter++;
>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> 
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> 
> Before creating of writer, we appending the partSuffix here, but it should be 
> already appended, before index checks
> if (partSuffix != null) {
>partPath = partPath.suffix(partSuffix);
> }
> I’ll create an issue and try to submit a fix
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 



Simple stdout sink for testing Table API?

2018-06-23 Thread chrisr123
Is there a simple way to output the first few rows of a Flink table to stdout 
when developing an application?  I just want to see the first 10-20 rows on
screen
during development to make sure my logic is correct. 
There doesnt seem to be something like print(10) in the API to see the first
n rows
Here is simple sample program, but I am writing to a CSV table sink for
testing right now.

// Get Customers
String customersPath = "input/customers.csv";
// id,first_name,last_name,email,address,city,state,zip
CsvTableSource customersTableSource = CsvTableSource.builder()
.path(customersPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", Types.INT())
.field("first_name", Types.STRING())
.field("last_name", Types.STRING())
.field("email", Types.STRING())
.field("address", Types.STRING())
.field("city", Types.STRING())
.field("state", Types.STRING())
.field("zip", Types.STRING())
.build();


// Register our table source
tableEnv.registerTableSource("customers", customersTableSource);
Table customers = tableEnv.scan("customers");


// Perform Operations
// SELECT id,last_name
// FROM customers
Table projection1 = customers
.select("id,last_name")
.filter("last_name !== 'foobar'");


// Write to Sinks
int parallelism = 1;
TableSink sink = new CsvTableSink("output/customers_out.csv", ",",
parallelism, WriteMode.OVERWRITE);
projection1.writeToSink(sink);



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Custom Watermarks with Flink

2018-06-23 Thread Wyatt Frelot
I originally posted to Stack Overflow because I was trying to figure out he
to do this in Flink.

Wondering how to implement something of the sort:
(1) *write up:  *
https://drive.google.com/file/d/0Bw69DO1tid2_SzVVendtUV9WMVdIUXptQ1hHSl9KNjAyMTBn/view?usp=drivesdk
(2) *original post: *
https://stackoverflow.com/questions/50949083/custom-watermarks-with-apache-flink
-- 
Wyatt J. Frelot


Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-06-23 Thread Rinat
Hi mates, could anyone please have a look on my PR, that fixes issue of 
incorrect indexing in BucketingSink component ?

Thx

> On 18 Jun 2018, at 10:55, Rinat  wrote:
> 
> I’ve created a JIRA issue https://issues.apache.org/jira/browse/FLINK-9603 
>  and added a proposal with 
> PR.
> 
> Thx
> 
>> On 16 Jun 2018, at 17:21, Rinat > > wrote:
>> 
>> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix 
>> of the part file. It’s very useful, when it’s necessary to set specific 
>> extension of the file.
>> 
>> During the usage, I’ve found the issue - when new part file is created, it 
>> has the same part index, as index of just closed file. 
>> So, when Flink tries to move it into final state, we have a 
>> FileAlreadyExistsException.
>> 
>> This problem is related with the following code:
>> Here we are trying to find the max index of part file, that doesn’t exist in 
>> bucket directory, the problem is, that the partSuffix is not involved into 
>> path assembly. This means, that path always doesn’t exist
>> and partCounter wouldn’t be ever incremented.
>> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
>> bucketState.partCounter);
>> while (fs.exists(partPath) ||
>>   fs.exists(getPendingPathFor(partPath)) ||
>>   fs.exists(getInProgressPathFor(partPath))) {
>>bucketState.partCounter++;
>>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
>> bucketState.partCounter);
>> }
>> 
>> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
>> 
>> Before creating of writer, we appending the partSuffix here, but it should 
>> be already appended, before index checks
>> if (partSuffix != null) {
>>partPath = partPath.suffix(partSuffix);
>> }
>> I’ll create an issue and try to submit a fix
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru 
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: Stream Join With Early firings

2018-06-23 Thread Johannes Schulte
Thanks Fabian! This seems to be the way to go

On Tue, Jun 19, 2018 at 12:18 PM Fabian Hueske  wrote:

> Hi Johannes,
>
> You are right. You should approach the problem with the semantics that you
> need before thinking about optimizations such as state size.
>
> The Table API / SQL offers (in v1.5.0) two types of joins:
> 1) Windowed joins where each record joins with records in a time-range of
> the other stream "(A.ts BETWEEN B.ts - 1 hour AND B.ts + 1 hour)"
> 2) Non-windowed joins, which support arbitrary join predicates but which
> fully materialize both inputs. As you mentioned, you can use idle state
> retention to remove records from state that have not been accessed for a
> certain time.
>
> Best, Fabian
>
> 2018-06-18 11:09 GMT+02:00 Johannes Schulte :
>
>> Hi Fabian,
>>
>> thanks for the hints, though I somehow got the feeling that I am on the
>> wrong track given how much code I would need to write for implementing a
>> "blueprint" usecase.
>>
>> Would a join be more simple using the Table API? In the end it's the
>> classical Order & OrderPosition example, where the output is an
>> upsert-stream. Would I get the expected behaviour (output elements on every
>> update on either side of the input stream). I realize that my session
>> window approach wasn't driven by the requirements but by operational
>> aspects (state size), so using a concept like idle state retention time
>> would be a more natural fit.
>>
>> Thanks,
>>
>> Johannes
>>
>> On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske  wrote:
>>
>>> Hi Johannes,
>>>
>>> EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default
>>> trigger (see EventTimeSessionWindows.getDefaultTrigger()).
>>>
>>> I would take the EventTimeTrigger and extend it with early firing
>>> functionality.
>>> However, there are a few things to consider
>>> * you need to be aware that session window can be merged, i.e., two
>>> session windows A, B with gap 10: A [20,25), B [37, 45), will be merged
>>> when a record at 32 is received.
>>> * windows store all records in a list. For every firing, you need to
>>> iterate the full list and also track which records you joined already to
>>> avoid duplicates. Maybe you can migrate records from the window state into
>>> a custom state defined in a ProcessWindowFunction.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>>
>>>
>>> 2018-06-13 13:43 GMT+02:00 Johannes Schulte 
>>> :
>>>
 Hi,

 I am joining two streams with a session window and want to emit a
 joined (early) result for every element arriving on one of the streams.

 Currently the code looks like this:

 s1.join(s2)
 .where(s1.id).equalTo(s2.id)
 .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
 // trigger(?)
 .apply(...custom code..)

 What I am missing is the right trigger ala "withEarlyFiring" - do I
 need to implement my on trigger for this and if yes, what kind of
 functionality must be present to not break the session window semantics?

 Thanks in advance,

 Johannes


>>>
>


Re: A question about Kryo and Window State

2018-06-23 Thread Vishal Santoshi
Actually, yes. I have a job already running with "FieldSerializer" in
production.  Any insights will be appreciated.

On Sat, Jun 23, 2018 at 7:39 AM, Vishal Santoshi 
wrote:

> Thanks.
>
> On Thu, Jun 21, 2018 at 4:34 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Vishal,
>>
>> Kryo has a serializer called `CompatibleFieldSerializer` that allows for
>> simple backward compatibility changes, such as adding non-optional fields /
>> removing fields.
>>
>> If using the KryoSerializer is a must, then a good thing to do is to
>> register Kryo's `CompatibleFieldSerializer` as the serializer to be used
>> for that specific type.
>> By default, Kryo doesn’t use the `CompatibleFieldSerializer`, so you
>> would have to explicitly register this.
>>
>> The issue is, I think it wouldn’t be possible to use the
>> `CompatibleFieldSerializer` _after_ the bytes were already written with the
>> default, non-compatible version (the `FieldSerializer`).
>> So, AFAIK, this should only work if your Kryo state type was registered
>> with the `CompatibleFieldSerializer` from the very beginning.
>> There could be a workaround, but unfortunately that would require
>> changing some code in the `KryoSerializer`.
>> If you require this because your job is already running in production and
>> data was already written by the `FieldSerializer`, please let me know and
>> I’ll go into more details about this.
>>
>> Cheers,
>> Gordon
>>
>> On 21 June 2018 at 10:14:15 AM, Fabian Hueske (fhue...@gmail.com) wrote:
>>
>> Hi Vishal,
>>
>> In general, Kryo serializers are not very upgrade friendly.
>> Serializer compatibility [1] might be right approach here, but Gordon (in
>> CC) might know more about this.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/
>> dev/stream/state/custom_serialization.html#handling-
>> serializer-upgrades-and-compatibility
>>
>> 2018-06-18 12:06 GMT+02:00 Vishal Santoshi :
>>
>>> Any more insight?
>>>
>>> On Wed, Jun 13, 2018, 3:34 PM Vishal Santoshi 
>>> wrote:
>>>
 Any ideas on the standard way ( or any roundabout way ) of doing a
 version upgrade that looks back ward compatible.
 The  @FieldSerializer.Optional("0") actually does  ignore the field (
 even if reset ) giving it the default value if kyro is used. It has to do
 with the FieldSerializer behaves  .  There is another Serializer (
 Composite I believe ) that allows for such back ward compatible changes.


 I know some work is being done in 1.6 to allow for above use case and I
 think Google Data Flow does provide some avenues.

 Thanks much

 Vishal



 On Tue, Jun 12, 2018 at 11:30 PM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> I have a running pipe with Window State in a class say
>
> Class A{
>  long a;
> }
>
> It uses the default KryoSerializer
>
> I want to add a field to
>
> Class A {
>   long a;
>   long b;
> }
>
> I need to suspend with SP and resume with the new version of Class A
>
>
> Is there a definite way to do this. I tried
>
> Class A {
>   long a;
>@FieldSerializer.Optional("0")
>   long b;
> }
>
> but that seems to default to 0 , even when the Aggregation is putting
> in values.
>
> Could somebody give pointers as to how to solve this
>
> Thanks a ton.
>
>
>
>

>>
>


Re: A question about Kryo and Window State

2018-06-23 Thread Vishal Santoshi
Thanks.

On Thu, Jun 21, 2018 at 4:34 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Vishal,
>
> Kryo has a serializer called `CompatibleFieldSerializer` that allows for
> simple backward compatibility changes, such as adding non-optional fields /
> removing fields.
>
> If using the KryoSerializer is a must, then a good thing to do is to
> register Kryo's `CompatibleFieldSerializer` as the serializer to be used
> for that specific type.
> By default, Kryo doesn’t use the `CompatibleFieldSerializer`, so you would
> have to explicitly register this.
>
> The issue is, I think it wouldn’t be possible to use the
> `CompatibleFieldSerializer` _after_ the bytes were already written with the
> default, non-compatible version (the `FieldSerializer`).
> So, AFAIK, this should only work if your Kryo state type was registered
> with the `CompatibleFieldSerializer` from the very beginning.
> There could be a workaround, but unfortunately that would require changing
> some code in the `KryoSerializer`.
> If you require this because your job is already running in production and
> data was already written by the `FieldSerializer`, please let me know and
> I’ll go into more details about this.
>
> Cheers,
> Gordon
>
> On 21 June 2018 at 10:14:15 AM, Fabian Hueske (fhue...@gmail.com) wrote:
>
> Hi Vishal,
>
> In general, Kryo serializers are not very upgrade friendly.
> Serializer compatibility [1] might be right approach here, but Gordon (in
> CC) might know more about this.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/stream/state/custom_serialization.html#
> handling-serializer-upgrades-and-compatibility
>
> 2018-06-18 12:06 GMT+02:00 Vishal Santoshi :
>
>> Any more insight?
>>
>> On Wed, Jun 13, 2018, 3:34 PM Vishal Santoshi 
>> wrote:
>>
>>> Any ideas on the standard way ( or any roundabout way ) of doing a
>>> version upgrade that looks back ward compatible.
>>> The  @FieldSerializer.Optional("0") actually does  ignore the field (
>>> even if reset ) giving it the default value if kyro is used. It has to do
>>> with the FieldSerializer behaves  .  There is another Serializer (
>>> Composite I believe ) that allows for such back ward compatible changes.
>>>
>>>
>>> I know some work is being done in 1.6 to allow for above use case and I
>>> think Google Data Flow does provide some avenues.
>>>
>>> Thanks much
>>>
>>> Vishal
>>>
>>>
>>>
>>> On Tue, Jun 12, 2018 at 11:30 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 I have a running pipe with Window State in a class say

 Class A{
  long a;
 }

 It uses the default KryoSerializer

 I want to add a field to

 Class A {
   long a;
   long b;
 }

 I need to suspend with SP and resume with the new version of Class A


 Is there a definite way to do this. I tried

 Class A {
   long a;
@FieldSerializer.Optional("0")
   long b;
 }

 but that seems to default to 0 , even when the Aggregation is putting
 in values.

 Could somebody give pointers as to how to solve this

 Thanks a ton.




>>>
>


Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-23 Thread Vishal Santoshi
1.
Can or has any one  done  a rolling upgrade from 1.4 to 1.5 ?  I am not
sure we can. It seems that JM cannot recover jobs with this exception

Caused by: java.io.InvalidClassException:
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
local class incompatible: stream classdesc serialVersionUID =
-647384516034982626, local class serialVersionUID = 2




2.
Does SP on 1.4, resume on 1.5 ( pretty basic but no harm asking ) ?



3.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html#update-configuration-for-reworked-job-deployment
The taskmanager.numberOfTaskSlots: What would be the desired setting in a
stand alone ( non mesos/yarn ) cluster ?


4. I suspend all jobs and establish 1.5 on the JM ( the TMs are still
running with 1.4 ) . JM refuse to start  with

Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: 2018-06-23
11:34:23 ERROR JobManager:116 - Failed to recover job
454cd84a519f3b50e88bcb378d8a1330.

Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]:
java.lang.InstantiationError: org.apache.flink.runtime.blob.BlobKey

Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
sun.reflect.GeneratedSerializationConstructorAccessor51.newInstance(Unknown
Source)

Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
java.lang.reflect.Constructor.newInstance(Constructor.java:423)

Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:1079)

Jun
.



Any feedback would be highly appreciated...


Re: [Flink-9407] Question about proposed ORC Sink !

2018-06-23 Thread zhangminglei
Yes, it should be exit. Thanks to Ted Yu. Very exactly! 

Cheers
Zhangminglei

> 在 2018年6月23日,下午12:40,Ted Yu  写道:
> 
> For #1, the word exist should be exit, right ?
> Thanks
> 
>  Original message 
> From: zhangminglei <18717838...@163.com>
> Date: 6/23/18 10:12 AM (GMT+08:00)
> To: sagar loke 
> Cc: dev , user 
> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
> 
> Hi, Sagar.
> 
>> 1. It solves the issue partially meaning files which have finished 
>> checkpointing don't show .pending status but the files which were in 
>> progress 
>> when the program exists are still in .pending state.
> 
> Ans: 
> 
> Yea, Make the program exists and in that time if a checkpoint does not 
> finished will lead the status keeps in .pending state then. Under the normal 
> circumstances, the programs that running in the production env will never be 
> stoped or existed if everything is fine.
> 
>> 2. Ideally, writer should work with default settings correct ? Meaning we 
>> don't have to explicitly set these parameters to make it work. 
>> Is this assumption correct ?
> 
> Ans: 
> 
> Yes. Writer should work with default settings correct.
> Yes. We do not have to explicitly set these parameters to make it work.
> Yes. Assumption correct indeed.
> 
> However, you know, flink is a real time streaming framework, so under normal 
> circumstances,you don't really go to use the default settings when it comes 
> to a specific business. Especially together work with offline end(Like hadoop 
> mapreduce). In this case, you need to tell the offline end when time a bucket 
> is close and when time the data for the specify bucket is ready. So, you can 
> take a look on https://issues.apache.org/jira/browse/FLINK-9609 
> .
> 
> Cheers
> Zhangminglei
> 
> 
>> 在 2018年6月23日,上午8:23,sagar loke > > 写道:
>> 
>> Hi Zhangminglei,
>> 
>> Thanks for the reply.
>> 
>> 1. It solves the issue partially meaning files which have finished 
>> checkpointing don't show .pending status but the files which were in 
>> progress 
>> when the program exists are still in .pending state.
>> 
>> 2. Ideally, writer should work with default settings correct ? Meaning we 
>> don't have to explicitly set these parameters to make it work. 
>> Is this assumption correct ?
>> 
>> Thanks,
>> Sagar
>> 
>> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com 
>> > wrote:
>> Hi, Sagar. Please use the below code and you will find the part files status 
>> from _part-0-107.in-progress   <> to _part-0-107.pending and finally to 
>> part-0-107. [For example], you need to run the program for a while. However, 
>> we need set some parameters, like the following. Moreover, 
>> enableCheckpointing IS also needed. I know why you always see the .pending 
>> file since the below parameters default value is 60 seconds even though you 
>> set the enableCheckpoint. So, that is why you can not see the finished file 
>> status until 60 seconds passed.
>> 
>> Attached is the ending on my end, and you will see what you want! 
>> 
>> Please let me know if you still have the problem.
>> 
>> Cheers
>> Zhangminglei
>> 
>> setInactiveBucketCheckInterval(2000)
>> .setInactiveBucketThreshold(2000);
>> 
>> public class TestOrc {
>>public static void main(String[] args) throws Exception {
>>   StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>   env.setParallelism(1);
>>   env.enableCheckpointing(1000);
>>   env.setStateBackend(new MemoryStateBackend());
>> 
>>   String orcSchemaString = "struct";
>>   String path = "hdfs://10.199.196.0:9000/data/hive/man <>";
>> 
>>   BucketingSink bucketingSink = new BucketingSink<>(path);
>> 
>>   bucketingSink
>>  .setWriter(new OrcFileWriter<>(orcSchemaString))
>>  .setInactiveBucketCheckInterval(2000)
>>  .setInactiveBucketThreshold(2000);
>> 
>>   DataStream dataStream = env.addSource(new ManGenerator());
>> 
>>   dataStream.addSink(bucketingSink);
>> 
>>   env.execute();
>>}
>> 
>>public static class ManGenerator implements SourceFunction {
>> 
>>   @Override
>>   public void run(SourceContext ctx) throws Exception {
>>  for (int i = 0; i < 2147483000; i++) {
>> Row row = new Row(3);
>> row.setField(0, "Sagar");
>> row.setField(1, 26 + i);
>> row.setField(2, false);
>> ctx.collect(row);
>>  }
>>   }
>> 
>>   @Override
>>   public void cancel() {
>> 
>>   }
>>}
>> }
>> 
>> 
>> 
>> 
>>> 在 2018年6月22日,上午11:14,sagar loke >> > 写道:
>>> 
>>> Sure, we can solve it together :)
>>> 
>>> Are you able to reproduce it ?
>>> 
>>> Thanks,
>>> Sagar
>>> 
>>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com 
>>> > wrote: