[ https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387961#comment-15387961 ]
Josh Forman-Gornall commented on FLINK-4190: -------------------------------------------- Thanks! Oh nice, this looks like a better solution for checking for bucket inactivity... For the tests, is there any reason not to fold all of those tests into the new `BucketingSinkTest`? Currently there's 4:(BucketingSinkITCase, BucketingSinkFaultToleranceITCase, BucketingSinkFaultTolerance2ITCase, BucketingSinkMultipleActiveBucketsCase) > Generalise RollingSink to work with arbitrary buckets > ----------------------------------------------------- > > Key: FLINK-4190 > URL: https://issues.apache.org/jira/browse/FLINK-4190 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors > Reporter: Josh Forman-Gornall > Assignee: Josh Forman-Gornall > Priority: Minor > > The current RollingSink implementation appears to be intended for writing to > directories that are bucketed by system time (e.g. minutely) and to only be > writing to one file within one bucket at any point in time. When the system > time determines that the current bucket should be changed, the current bucket > and file are closed and a new bucket and file are created. The sink cannot be > used for the more general problem of writing to arbitrary buckets, perhaps > determined by an attribute on the element/tuple being processed. > There are three limitations which prevent the existing sink from being used > for more general problems: > - Only bucketing by the current system time is supported, and not by e.g. an > attribute of the element being processed by the sink. > - Whenever the sink sees a change in the bucket being written to, it flushes > the file and moves on to the new bucket. Therefore the sink cannot have more > than one bucket/file open at a time. Additionally the checkpointing mechanics > only support saving the state of one active bucket and file. > - The sink determines that it should 'close' an active bucket and file when > the bucket path changes. We need another way to determine when a bucket has > become inactive and needs to be closed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)