[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204942574
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,68 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   adaptRecordsToRead(runLoopTimeNanos, 
fetchedRecords.size(), recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
+   if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 
0) {
+   long averageRecordSizeBytes = recordBatchSizeBytes / 
numRecords;
+   // Adjust number of records to fetch from the shard 
depending on current average record size
+   // to optimize 2 Mb / sec read limits
+   double loopFrequencyHz = 10.0d / 
runLoopTimeNanos;
+   double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+   maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
+   // Ensure the value is not more than 1L
+   maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+   }
+   return maxNumberOfRecordsPerFetch;
--- End diff --

Oops, thanks for catching. Updated to use the return value and also to use 
a local variable in the 

[GitHub] flink pull request #6409: Flink 9899.kinesis connector metrics

2018-07-24 Thread glaksh100
GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6409

Flink 9899.kinesis connector metrics

## What is the purpose of the change

The purpose of this change is to add metrics to the `ShardConsumer` to get 
more observability into the performance of the Kinesis connector, including the 
enhancements introduced in 
[FLINK-9897](https://issues.apache.org/jira/browse/FLINK-9899) . 

**Important** - https://github.com/apache/flink/pull/6408 has to be merged 
**before** taking out this change.

## Brief change log
All metrics are added as gauges. The following per-shard metrics are added. 
:
- sleepTimeMillis
- maxNumberOfRecordsPerFetch
- numberOfAggregatedRecordsPerFetch
- numberOfDeaggregatedRecordsPerFetch
- bytesRequestedPerFetch
- averageRecordSizeBytes
- runLoopTimeNanos
- loopFrequencyHz

## Verifying this change

This change is already covered by existing tests, such as: 
`ShardConsumerTest`, `KinesisDataFetcherTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lyft/flink FLINK-9899.KinesisConnectorMetrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6409


commit f333781a7c4f1a10b6120a962ff211e023bafaab
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis

Remove unused method

commit f51703177df9afcdba3778909b1e9d8b7fa4bf46
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis

commit d493097d09c6223383282ed90648853715b197ce
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T21:13:53Z

[FLINK-9899] Add more ShardConsumer metrics

Checkstyle fix




---


[GitHub] flink pull request #6408: [FLINK-9897] Make adaptive reads depend on run loo...

2018-07-24 Thread glaksh100
GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6408

[FLINK-9897] Make adaptive reads depend on run loop time instead of 
fetchintervalmillis

## What is the purpose of the change
[FLINK-9692](https://github.com/apache/flink/pull/6300) introduced the 
feature of adapting `maxNumberOfRecordsPerFetch` based on the average size of 
Kinesis records. The PR assumed a maximum of `1/fetchIntervalMillis` 
reads/second. However, in the case that the run loop of the `ShardConsumer` 
takes more than `fetchIntervalMillis` to process records, the 
`maxNumberOfRecordsPerFetch` is still sub-optimal. The purpose of this change 
is to make the adaptive reads more efficient by using the actual run loop 
frequency to determine the number of reads/second and 
`maxNumberOfRecordsPerFetch`. The change also re-factors the run loop to be 
more modular.


## Brief change log

  - `processingStartTimeNanos` records start time of loop
  -  `processingEndTimeNanos` records end time of loop
  -  `adjustRunLoopFrequency()` adjusts end time depending on 
`sleepTimeMillis` (if any).
  -  `runLoopTimeNanos` records actual run loop time.
  -  `adaptRecordsToRead` calculates `maxNumberOfRecordsPerFetch` based on 
`runLoopTimeNanos`
  - Unused method `getAdaptiveMaxRecordsPerFetch` is removed.

## Verifying this change

This change is already covered by existing tests, such as 
`ShardConsumerTest`
This has also been tested against a stream with the following configuration
```
Number of Shards: 512
Parallelism: 128
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lyft/flink FLINK-9897.AdaptiveReadsRunLoop

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6408.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6408


commit 786556b9a9a509051a14772fbbd282db73e65252
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis




---


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-13 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
@StephanEwen I have run this on the following set up:
```
Number of shards on Kinesis stream: 384
Number of task slots: 384 / 192 / 96
Throughput achieved per shard (with adaptive reads) : 1.95 Mb/sec /  1.75 
Mb/sec / 1.6 Mb/sec
```



---


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-12 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
The idea here is that `maxNumberOfRecordsPerFetch` should never be a value 
that gets records that exceeds the read limit  (2 Mb/sec) from the math here.
```
2 Mb/sec / (averageRecordSizeBytes * # reads/sec))
``` 
Atleast that's what the intent is - Let me know if that makes sense or if 
there is something amiss about the approach here. If there is a way in which 
`maxNumberOfRecordsPerFetch` is set such that the limit is exceeded, then yes, 
it will still be throttled by Kinesis.


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202227845
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   protected int getAdaptiveMaxRecordsPerFetch(long 
averageRecordSizeBytes) {
+   int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+   if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+   adaptedMaxRecordsPerFetch = (int) 
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / 
fetchIntervalMillis));
+
+   // Ensure the value is not more than 1L
+   adaptedMaxRecordsPerFetch = 
adaptedMaxRecordsPerFetch <= 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ?
--- End diff --

Changed.


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202227834
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -134,6 +134,10 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
 
+   /** The config to turn on adaptive reads from a shard. */
+   public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.use.adaptive.reads";
--- End diff --

Changed.


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202156901
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
--- End diff --

Makes sense. Done.


---


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-10 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
@fhueske @tzulitai @tweise  Can you please take a look when you have a 
chance?


---


[GitHub] flink pull request #6300: [FLINK-9692] Adaptive reads from Kinesis

2018-07-10 Thread glaksh100
GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6300

[FLINK-9692] Adaptive reads from Kinesis

## What is the purpose of the change

The purpose of this change is to provide an option to the Kinesis connector 
to optimize the amount of data (in bytes) read from Kinesis. The Kinesis 
connector currently has a [constant 
value](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213)
 set for `maxNumberOfRecordsPerFetch` that it can fetch from a single Kinesis 
`getRecords` call. However, in most realtime scenarios, the average size of the 
Kinesis record (in bytes) is not constant.
The idea here is to adapt the Kinesis connector to identify an average 
batch size prior to making the `getRecords` call, so that the 
`maxNumberOfRecordsPerFetch` parameter can be tuned to be as high as possible 
without exceeding  the 2 Mb/sec [per shard 
limit](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html).

This feature can be set using a 
[ConsumerConfigConstants](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java)
 flag that defaults to false. 


## Brief change log
 - With an initial value for `maxNumberofRecordsPerFetch`, the average size 
of a record returned in the batch of records is calculated
  - `maxNumberofRecordsPerFetch` is then set to ` 2 Mbps/ (average size of 
record/fetchIntervalMillis)` to maximize throughput in each `getRecords` call
  - This feature is turned on/off using a boolean  in 
`ConsumerConfigConstants` - `SHARD_USE_ADAPTIVE_READS`
 - `DEFAULT_SHARD_USE_ADAPTIVE_READS` is set to `false`

## Verifying this change
This change added tests and can be verified as follows:
  - Added a 
`testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads` test method 
to `ShardConsumerTest`

## Does this pull request potentially affect one of the following parts:
  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation
  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/glaksh100/flink 
FLINK-9692.adaptiveKinesisReads

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6300


commit 0c29017d6d1e98359d3093aaaecc54338324e57e
Author: Lakshmi Gururaja Rao 
Date:   2018-07-10T18:40:02Z

[FLINK-9692] Adaptive reads from Kinesis




---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-09 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
@fhueske  I gave it some thought and your suggestion makes sense to me. I 
have extended `checkForInactiveBuckets` to include the rollover check. I have 
also updated Javadocs in a few places:
- Added a note in the top-level Javadocs to update functionality of 
`checkForInactiveBuckets()`
- Updated JavaDocs for both `setBatchRolloverInterval()` and 
`setInactiveBucketThreshold()`
- Updated JavaDoc for `checkForInactiveBuckets()`

Let me know if the updates make sense and thank you for reviewing!



---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-04 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
Thanks for reviewing @fhueske  @aljoscha and @kl0u ! I have addressed the 
latest review comments. Can you PTAL (again) ?


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-05-04 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r186223237
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -473,6 +482,15 @@ private boolean shouldRoll(BucketState bucketState) 
throws IOException {
subtaskIndex,
writePosition,
batchSize);
+   } else {
+   long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
--- End diff --

Updated method signature for `shouldRoll` to include the 
`currentProcessingTime` 


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-05-04 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r186223099
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -908,6 +929,20 @@ private void 
handlePendingFilesForPreviousCheckpoints(Map> pe
return this;
}
 
+   /**
+* Sets the roll over interval in milliseconds.
+*
+*
+* When a bucket part file is older than the roll over interval, a 
new bucket part file is
+* started and the old one is closed. The name of the bucket file 
depends on the {@link Bucketer}.
+*
+* @param batchRolloverInterval The roll over interval in milliseconds
+*/
+   public BucketingSink setBatchRolloverInterval(long 
batchRolloverInterval) {
+   this.batchRolloverInterval = batchRolloverInterval;
+   return this;
--- End diff --

Added a check for `batchRolloverInterval` to be a positive non-zero value.


---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-02 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
@fhueske Can you PTAL and merge this PR? 


---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-04-25 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
@fhueske Thank you for reviewing. I have incorporated the changes that 
include update to the 
[documentation](https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html)
 on the website. 

Travis seems to be failing on a test case that seems unrelated -
```java.lang.AssertionError: This program execution should have failed.
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.test.misc.SuccessAfterNetworkBuffersFailureITCase.testSuccessfulProgramAfterFailure(SuccessAfterNetworkBuffersFailureITCase.java:75)```


---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-04-17 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
@aljoscha @fhueske Can you please take a look? 


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-04-16 Thread glaksh100
GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/5860

[FLINK-9138][filesystem-connectors] Implement time based rollover in 
BucketingSink

## What is the purpose of the change

This pull request enables a time-based rollover of the part file in the 
BucketingSink. This is particularly applicable when when write throughput is 
low and helps data become available at a fixed interval, for consumption.

## Brief change log
  - Add a `batchRolloverInterval` field with a setter 
  - Track a `firstWrittenToTime` for the bucket state
  - Check for `currentProcessingTime` - `firstWrittenToTime` > 
`batchRolloverInterval` and roll over if true

## Verifying this change

This change added tests and can be verified as follows:

  - Added a `testRolloverInterval` test method to the `BucketingSinkTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/glaksh100/flink 
FLINK-9138.bucketingSinkRolloverInterval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5860.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5860


commit fee3ba293f4db4ad2d39b4ac0f3993711da9bda6
Author: Lakshmi Gururaja Rao 
Date:   2018-04-16T23:31:49Z

[FLINK-9138] Implement time based rollover of part file in BucketingSink




---