[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596276#comment-16596276 ] ASF GitHub Bot commented on FLINK-9962: --- asfgit closed pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md index af1349d6665..79ed08e9d41 100644 --- a/docs/dev/connectors/filesystem_sink.md +++ b/docs/dev/connectors/filesystem_sink.md @@ -70,7 +70,8 @@ stored. The sink can be further configured by specifying a custom bucketer, writ By default the bucketing sink will split by the current system time when elements arrive and will use the datetime pattern `"-MM-dd--HH"` to name the buckets. This pattern is passed to -`SimpleDateFormat` with the current system time to form a bucket path. A new bucket will be created +`DateTimeFormatter` with the current system time and JVM's default timezone to form a bucket path. +Users can also specify a timezone for the bucketer to format bucket path. A new bucket will be created whenever a new date is encountered. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: each parallel instance of the sink will create its own part file and @@ -105,7 +106,7 @@ Example: DataStream> input = ...; BucketingSink sink = new BucketingSink("/base/path"); -sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm")); +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))); sink.setWriter(new SequenceFileWriter()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins @@ -119,7 +120,7 @@ input.addSink(sink); val input: DataStream[Tuple2[IntWritable, Text]] = ... val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm")) +sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java index b7035fe8ab4..d549eda3062 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java @@ -19,13 +19,15 @@ package org.apache.flink.streaming.connectors.fs.bucketing; import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.ObjectInputStream; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; /** * A {@link Bucketer} that assigns to buckets based on current system time. @@ -38,8 +40,8 @@ * is determined based on the current system time and the user provided format string. * * - * {@link SimpleDateFormat} is used to derive a date string from the current system time and - * the date format string. The default format string is {@code "-MM-dd--HH"} so the rolling + * {@link DateTimeFormatter} is used to derive a date string from the current system time and + * the date format string with a timezone. The default format string is {@code "-MM-dd--HH"} so the rolling * files will have a granularity of hours. * * @@ -61,44 +63,67 @@ private static final String DEFAULT_FORMAT_STRING = "-MM-dd--HH"; private final String formatString; + private final ZoneId zoneId; - private transient SimpleDateFormat dateFormatter; + private transient DateTimeFormatter dateTimeFormatter; /** -* Creates a new {@code DateTimeBucketer} with format string {@code "-MM-dd--HH"}. +* Creates a new {@code DateTimeBucketer} with format string {@code "-MM-dd--HH"} using JVM's default timezone.
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595256#comment-16595256 ] ASF GitHub Bot commented on FLINK-9962: --- bowenli86 commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-416655819 @kl0u This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587826#comment-16587826 ] ASF GitHub Bot commented on FLINK-9962: --- bowenli86 commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414771922 we'll miss a bit test coverage but I think removing that test is fine This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587301#comment-16587301 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on a change in pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#discussion_r211538279 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java ## @@ -59,32 +60,56 @@ private static final String DEFAULT_FORMAT_STRING = "-MM-dd--HH"; private final String formatString; + private final ZoneId zoneId; - private transient SimpleDateFormat dateFormatter; + private transient DateTimeFormatter dateTimeFormatter; /** -* Creates a new {@code DateTimeBucketer} with format string {@code "-MM-dd--HH"}. +* Creates a new {@code DateTimeBucketAssigner} with format string {@code "-MM-dd--HH"}. */ public DateTimeBucketAssigner() { this(DEFAULT_FORMAT_STRING); } /** -* Creates a new {@code DateTimeBucketer} with the given date/time format string. +* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string. * * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine -* the bucket path. +* the bucket id. */ public DateTimeBucketAssigner(String formatString) { + this(formatString, ZoneId.systemDefault()); + } + + /** +* Creates a new {@code DateTimeBucketAssigner} with format string {@code "-MM-dd--HH"} using the given timezone. +* +* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id. +*/ + public DateTimeBucketAssigner(ZoneId zoneId) { + this(DEFAULT_FORMAT_STRING, zoneId); + } + + /** +* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone. +* +* @param formatString The format string that will be given to {@code DateTimeFormatter} to determine +* the bucket path. +* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id. +*/ + public DateTimeBucketAssigner(String formatString, ZoneId zoneId) { this.formatString = formatString; + this.zoneId = zoneId; Review comment: Missing check for nulls: ``` this.formatString = Preconditions.checkNotNull(formatString); this.zoneId = Preconditions.checkNotNull(zoneId); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587302#comment-16587302 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on a change in pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#discussion_r211568018 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java ## @@ -38,8 +39,8 @@ * is determined based on the current system time and the user provided format string. * * - * {@link SimpleDateFormat} is used to derive a date string from the current system time and - * the date format string. The default format string is {@code "-MM-dd--HH"} so the rolling + * {@link DateTimeFormatter} is used to derive a date string from the current system time and Review comment: Same comments as for the `DateTimeBucketAssigner`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587303#comment-16587303 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on a change in pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#discussion_r211567834 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java ## @@ -59,32 +60,56 @@ private static final String DEFAULT_FORMAT_STRING = "-MM-dd--HH"; private final String formatString; + private final ZoneId zoneId; - private transient SimpleDateFormat dateFormatter; + private transient DateTimeFormatter dateTimeFormatter; /** -* Creates a new {@code DateTimeBucketer} with format string {@code "-MM-dd--HH"}. +* Creates a new {@code DateTimeBucketAssigner} with format string {@code "-MM-dd--HH"}. */ public DateTimeBucketAssigner() { this(DEFAULT_FORMAT_STRING); } /** -* Creates a new {@code DateTimeBucketer} with the given date/time format string. +* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string. * * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine -* the bucket path. +* the bucket id. */ public DateTimeBucketAssigner(String formatString) { + this(formatString, ZoneId.systemDefault()); + } + + /** +* Creates a new {@code DateTimeBucketAssigner} with format string {@code "-MM-dd--HH"} using the given timezone. +* +* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id. +*/ + public DateTimeBucketAssigner(ZoneId zoneId) { + this(DEFAULT_FORMAT_STRING, zoneId); + } + + /** +* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone. +* +* @param formatString The format string that will be given to {@code DateTimeFormatter} to determine +* the bucket path. +* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id. +*/ + public DateTimeBucketAssigner(String formatString, ZoneId zoneId) { this.formatString = formatString; + this.zoneId = zoneId; } @Override public String getBucketId(IN element, BucketAssigner.Context context) { - if (dateFormatter == null) { - dateFormatter = new SimpleDateFormat(formatString); + if (dateTimeFormatter == null) { + dateTimeFormatter = DateTimeFormatter.ofPattern(formatString); } - return dateFormatter.format(new Date(context.currentProcessingTime())); + + return dateTimeFormatter.format( + Instant.ofEpochMilli(context.currentProcessingTime()).atZone(zoneId).toLocalDateTime()); Review comment: If I am not wrong, this method can become: ``` if (dateTimeFormatter == null) { dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId); } return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime())); ``` This avoids a bit of object creation, which is important as this method is called for every incoming element. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587198#comment-16587198 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u edited a comment on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414608532 Hi @bowenli86 ! I am in the process of reviewing the PR. There is no need to mock `ZoneId.systemDefault()`. We have a constructor where we can specify the the `ZoneId`, so why not using this in order to provide a `ZoneId`? This means that the `testGetBucketPathWithDefaultTimezone` will become: ``` @Test public void testGetBucketPathWithDefaultTimezone() { ZoneId utc = ZoneId.of("UTC"); DateTimeBucketAssigner bucketAssigner = new DateTimeBucketAssigner(utc); assertEquals("2018-08-04--06", bucketAssigner.getBucketId(null, mockedContext)); } ``` What do you think? And now that I see the tests, we do not even need that test, as we also have the `testGetBucketPathWithSpecifiedTimezone` ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587197#comment-16587197 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414608532 Hi @bowenli86 ! I am in the process of reviewing the PR. There is no need to mock `ZoneId.systemDefault()`. We have a constructor where we can specify the the `ZoneId`, so why not using this in order to provide a `ZoneId`? This means that the `testGetBucketPathWithDefaultTimezone` will become: ``` @Test public void testGetBucketPathWithDefaultTimezone() { ZoneId utc = ZoneId.of("UTC"); DateTimeBucketAssigner bucketAssigner = new DateTimeBucketAssigner(utc); assertEquals("2018-08-04--06", bucketAssigner.getBucketId(null, mockedContext)); } ``` What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586727#comment-16586727 ] ASF GitHub Bot commented on FLINK-9962: --- bowenli86 commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414513166 @kl0u a couple things that may need attention: - powerMock needs to stay in order to mock static method `ZoneId.systemDefault()`. Didn't find a better way to mock static method with Mockito other than using PowerMockito. I believe what we have now should be fine. - `SimpleDateFormat` doesn't take java8's new `LocalDateTime`, so have to change formatter to `DateTimeFormatter` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585553#comment-16585553 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414226276 Perfect! Thanks a lot @bowenli86 ! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585512#comment-16585512 ] ASF GitHub Bot commented on FLINK-9962: --- bowenli86 commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414217627 @kl0u I'll try to make it this week. If I can't, feel free to work on it next week This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584676#comment-16584676 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414036401 Hi @bowenli86 ! Are you planning to continue working on this issue? If not, I could work on that next week ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569404#comment-16569404 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on a change in pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#discussion_r207728642 ## File path: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java ## @@ -0,0 +1,49 @@ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.flink.streaming.connectors.fs.Clock; + +import org.apache.hadoop.fs.Path; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for {@link DateTimeBucketer}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(DateTimeBucketer.class) +public class DateTimeBucketerTest { + private static final long TEST_TIME_IN_MILLIS = 1533363082011L; + private static final Path TEST_PATH = new Path("test"); + + @Test + public void testGetBucketPathWithDefaultTimezone() { + TimeZone utc = TimeZone.getTimeZone("UTC"); + PowerMockito.mockStatic(TimeZone.class); + when(TimeZone.getDefault()).thenReturn(utc); + + DateTimeBucketer bucketer = new DateTimeBucketer(); + + Clock clock = mock(Clock.class); + when(clock.currentTimeMillis()).thenReturn(TEST_TIME_IN_MILLIS); + + assertEquals(new Path("test/2018-08-04--06"), bucketer.getBucketPath(clock, TEST_PATH, null)); + } + + @Test + public void testGetBucketPathWithSpecifiedTimezone() { + Clock clock = mock(Clock.class); + when(clock.currentTimeMillis()).thenReturn(TEST_TIME_IN_MILLIS); + DateTimeBucketer bucketer = new DateTimeBucketer(TimeZone.getTimeZone("PST")); + + assertEquals(new Path("test/2018-08-03--23"), bucketer.getBucketPath(clock, TEST_PATH, null)); + } +} Review comment: We do not need Powermock at all in these tests. We can remove all the mocks and add our own clock. ``` new Clock() { @Override public long currentTimeMillis() { return TEST_TIME_IN_MILLIS; } } ``` In general I am against mocking classes (if possible to avoid it) as this code maintainability more tricky. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569402#comment-16569402 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on a change in pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#discussion_r207728642 ## File path: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java ## @@ -0,0 +1,49 @@ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.flink.streaming.connectors.fs.Clock; + +import org.apache.hadoop.fs.Path; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for {@link DateTimeBucketer}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(DateTimeBucketer.class) +public class DateTimeBucketerTest { + private static final long TEST_TIME_IN_MILLIS = 1533363082011L; + private static final Path TEST_PATH = new Path("test"); + + @Test + public void testGetBucketPathWithDefaultTimezone() { + TimeZone utc = TimeZone.getTimeZone("UTC"); + PowerMockito.mockStatic(TimeZone.class); + when(TimeZone.getDefault()).thenReturn(utc); + + DateTimeBucketer bucketer = new DateTimeBucketer(); + + Clock clock = mock(Clock.class); + when(clock.currentTimeMillis()).thenReturn(TEST_TIME_IN_MILLIS); + + assertEquals(new Path("test/2018-08-04--06"), bucketer.getBucketPath(clock, TEST_PATH, null)); + } + + @Test + public void testGetBucketPathWithSpecifiedTimezone() { + Clock clock = mock(Clock.class); + when(clock.currentTimeMillis()).thenReturn(TEST_TIME_IN_MILLIS); + DateTimeBucketer bucketer = new DateTimeBucketer(TimeZone.getTimeZone("PST")); + + assertEquals(new Path("test/2018-08-03--23"), bucketer.getBucketPath(clock, TEST_PATH, null)); + } +} Review comment: We do not need Powermock at all in these tests. We can remove all the mocks and add our own clock. ``` new Clock() { @Override public long currentTimeMillis() { return TEST_TIME_IN_MILLIS; } } ``` In general I am against mocking classes (if possible to avoid it) as this code maintainability more tricky. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569343#comment-16569343 ] ASF GitHub Bot commented on FLINK-9962: --- yanghua commented on a change in pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#discussion_r207723967 ## File path: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java ## @@ -0,0 +1,49 @@ +package org.apache.flink.streaming.connectors.fs.bucketing; Review comment: Oh, it seems you forgot a Apache License title. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569342#comment-16569342 ] ASF GitHub Bot commented on FLINK-9962: --- yanghua commented on a change in pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#discussion_r207723921 ## File path: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java ## @@ -0,0 +1,49 @@ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.flink.streaming.connectors.fs.Clock; + +import org.apache.hadoop.fs.Path; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for {@link DateTimeBucketer}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(DateTimeBucketer.class) +public class DateTimeBucketerTest { Review comment: I think we need at least a test case for specifying a custom formatString. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569322#comment-16569322 ] ASF GitHub Bot commented on FLINK-9962: --- bowenli86 opened a new pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492 ## What is the purpose of the change allow users to specify TimeZone in DateTimeBucketer ## Brief change log - add TimeZone as constructor param to DateTimeBucketer - add unit tests ## Verifying this change This change added tests and can be verified as follows: - add unit tests in DateTimeBucketerTest ## Does this pull request potentially affect one of the following parts: - The S3 file system connector: (yes) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569067#comment-16569067 ] Bowen Li commented on FLINK-9962: - That should work. Thanks Kostas > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565586#comment-16565586 ] Kostas Kloudas commented on FLINK-9962: --- [~phoenixjiangnan] I agree that for *responsiveness* it would be nice to be able to roll also on event time. This is actually the reason why in the {{RollingPolicy}} interface of new {{StreamingFileSink}}, there is a method is called {{shouldRollOnProcessingTime}}. The idea was in the future to add a {{shouldRollOnEventTime}}, but this requires a bit more work also in operator level. In general, a motivating example for the new {{StreamingFileSink}} was to be able to do "windowing" but with files. That said, being able to roll on event time is only a matter of *responsiveness*, not correctness. In the new {{StreamingFileSink}}, you can always use the {{Bucketer}} to assign an incoming element to the correct bucket (like the {{WindowAssigner}}), and use the inactivity interval of the default rolling policy to close the "window", i.e. the in-progress file, when no more data is arriving for that window. Does this fit your need? > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564337#comment-16564337 ] Bowen Li commented on FLINK-9962: - [~fhueske] Sounds good Besides, after reading code, it also seems to me that both BucketingSink and StreamingFileSink rolls their bucket paths based on only system time (processing time), not event time. It causes problems especially when users backfill data and want the correct dir structures corresponding to their historical data timestamp. What do you think of this use case? Is there any workaround or any plan to support event time bucket path rolling? > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16563591#comment-16563591 ] Fabian Hueske commented on FLINK-9962: -- Hi [~phoenixjiangnan], this sounds like a good idea to me. The file sink was recently reworked. Flink 1.6.0 will ship with {{StreamingFileSink}} which also supports bucketers like {{DateTimeBucketer}}. > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16558900#comment-16558900 ] Bowen Li commented on FLINK-9962: - [~fhue...@gmail.com] what do you think of this change? > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)