[jira] [Commented] (FLINK-13864) StreamingFileSink: Allow inherited classes to extend StreamingFileSink correctly
[ https://issues.apache.org/jira/browse/FLINK-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917708#comment-16917708 ] Kailash Hassan Dayanand commented on FLINK-13864: - Hello [~kkl0u], Thanks for your response. Wanted to include some code snippets which will make the discussion easier. This is my current extended class: public class ExtendedtStreamingFileSink extends StreamingFileSink { public static StreamingFileSink.BulkFormatBuilder forBulkFormat( Path basePath, Factory writerFactory) { return new ExtendedtStreamingFileSink.ExtendedBulkFormatBuilder( basePath, writerFactory, new DateTimeBucketAssigner()); } public static class ExtendedBulkFormatBuilder extends StreamingFileSink.BulkFormatBuilder { public ExtendedtStreamingFileSink.ExtendedBulkFormatBuilder withBucketAssigner( BucketAssigner assigner) { return new ExtendedtStreamingFileSink.ExtendedBulkFormatBuilder( super.basePath, super.writerFactory, (BucketAssigner) Preconditions.checkNotNull(assigner), this.bucketCheckInterval, new DefaultBucketFactoryImpl()); } ... similar function for other functions: withBucketCheckInterval, // Constructors public ExtendedBulkFormatBuilder( Path basePath, Factory writerFactory, BucketAssigner assigner) { super(basePath, writerFactory, assigner); } private ExtendedBulkFormatBuilder( Path basePath, Factory writerFactory, BucketAssigner assigner, long bucketCheckInterval, BucketFactory bucketFactory) { super(basePath, writerFactory, assigner, bucketCheckInterval, bucketFactory); } } Since I am using the private members of the StreamingFileSink.Builder class in the extendedBuilder class, I changed them to protected. But this may not be completely necessary and I could use some mechanism described here: [https://stackoverflow.com/questions/17164375/subclassing-a-java-builder-class] which may this recommended change unnecessary. > StreamingFileSink: Allow inherited classes to extend StreamingFileSink > correctly > > > Key: FLINK-13864 > URL: https://issues.apache.org/jira/browse/FLINK-13864 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the StreamingFileSink can't be extended correctly as there are a > few issues [PR |[https://github.com/apache/flink/pull/8469]] merged for this > [Jira|https://issues.apache.org/jira/browse/FLINK-12539] > Mailing list discussion: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201908.mbox/%3CCACGLQUAxXjr2mBOf-6hbXcwmWoH5ib_0YEy-Vyjj%3DEPyQ25Qiw%40mail.gmail.com%3E] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13864) StreamingFileSink: Allow inherited classes to extend StreamingFileSink correctly
Kailash Hassan Dayanand created FLINK-13864: --- Summary: StreamingFileSink: Allow inherited classes to extend StreamingFileSink correctly Key: FLINK-13864 URL: https://issues.apache.org/jira/browse/FLINK-13864 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Kailash Hassan Dayanand Currently the StreamingFileSink can't be extended correctly as there are a few issues [PR |[https://github.com/apache/flink/pull/8469]] merged for this [Jira|https://issues.apache.org/jira/browse/FLINK-12539] Mailing list discussion: [http://mail-archives.apache.org/mod_mbox/flink-dev/201908.mbox/%3CCACGLQUAxXjr2mBOf-6hbXcwmWoH5ib_0YEy-Vyjj%3DEPyQ25Qiw%40mail.gmail.com%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
Kailash Hassan Dayanand created FLINK-12539: --- Summary: StreamingFileSink: Make the class extendable to customize for different usecases Key: FLINK-12539 URL: https://issues.apache.org/jira/browse/FLINK-12539 Project: Flink Issue Type: New Feature Components: Connectors / FileSystem Reporter: Kailash Hassan Dayanand Assignee: Kailash Hassan Dayanand Currently the StreamingFileSink has Builder pattern and the actual constructor of StreamingFileSink is private. This makes it hard to extend the class to built on top of this and customize the sink. (Example: Adding new metrics). Proposing to make this protected as well as protected for the Builder interface. Discussion is here: [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12149) Support Proto for Streaming File Sink
[ https://issues.apache.org/jira/browse/FLINK-12149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16838775#comment-16838775 ] Kailash Hassan Dayanand commented on FLINK-12149: - [~yanghua] I have some work which I put into this here: [https://github.com/kailashhd/flink/commit/10fcbd41dbaa49959d61d3ab08c0ac29da554ab2]. I can share my clean PR and send it across for review in a day or so. > Support Proto for Streaming File Sink > -- > > Key: FLINK-12149 > URL: https://issues.apache.org/jira/browse/FLINK-12149 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: vinoyang >Priority: Major > > Currently we are supporting AuroParquetWriters in flink > here:[https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.html]. > Supporting an protoParquetWriter within Flink will be a good addition as > well. > Currently Parquet already have support for ProtoParquetWriters here: > [https://github.com/apache/parquet-mr/blob/master/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java]. > We can extend the classes from here and make it available in the appropriate > format to support ProtoParquet writing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12149) Support Proto for Streaming File Sink
Kailash Hassan Dayanand created FLINK-12149: --- Summary: Support Proto for Streaming File Sink Key: FLINK-12149 URL: https://issues.apache.org/jira/browse/FLINK-12149 Project: Flink Issue Type: New Feature Reporter: Kailash Hassan Dayanand Currently we are supporting AuroParquetWriters in flink here:[https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.html]. Supporting an protoParquetWriter within Flink will be a good addition as well. Currently Parquet already have support for ProtoParquetWriters here: [https://github.com/apache/parquet-mr/blob/master/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java]. We can extend the classes from here and make it available in the appropriate format to support ProtoParquet writing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442760#comment-16442760 ] Kailash Hassan Dayanand commented on FLINK-8944: [Update]: There is currently a minor issue with the listShards API in AWS which blocks these changes [https://github.com/aws/aws-sdk-java/issues/1490] (look at the last comment). One workaround could be get list of all shards every time which is slightly inefficient. Waiting for sometime to allow AWS changes to go in before submitting this. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8945) Allow customization of the KinesisProxy
[ https://issues.apache.org/jira/browse/FLINK-8945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kailash Hassan Dayanand updated FLINK-8945: --- Summary: Allow customization of the KinesisProxy (was: Allow customization of the KinesisProxy Interface) > Allow customization of the KinesisProxy > --- > > Key: FLINK-8945 > URL: https://issues.apache.org/jira/browse/FLINK-8945 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the KinesisProxy interface here: > [https://github.com/apache/flink/blob/310f3de62e52f1f977c217d918cc5aac79b87277/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L125] > has a private constructor. This restricts extending the class and prevents > customizations on shard discovery. I am proposing to change this to protected. > While the creating a new implementation of KinesisProxyInterface is possible, > I would like to continue to use implementation of getRecords and > getShardIterator. > This will be a temporary workaround till FLINK-8944 is submitted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8945) Allow customization of the KinesisProxy Interface
Kailash Hassan Dayanand created FLINK-8945: -- Summary: Allow customization of the KinesisProxy Interface Key: FLINK-8945 URL: https://issues.apache.org/jira/browse/FLINK-8945 Project: Flink Issue Type: Improvement Reporter: Kailash Hassan Dayanand Currently the KinesisProxy interface here: [https://github.com/apache/flink/blob/310f3de62e52f1f977c217d918cc5aac79b87277/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L125] has a private constructor. This restricts extending the class and prevents customizations on shard discovery. I am proposing to change this to protected. While the creating a new implementation of KinesisProxyInterface is possible, I would like to continue to use implementation of getRecords and getShardIterator. This will be a temporary workaround till FLINK-8944 is submitted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
Kailash Hassan Dayanand created FLINK-8944: -- Summary: Use ListShards for shard discovery in the flink kinesis connector Key: FLINK-8944 URL: https://issues.apache.org/jira/browse/FLINK-8944 Project: Flink Issue Type: Improvement Reporter: Kailash Hassan Dayanand Currently the DescribeStream AWS API used to get list of shards is has a restricted rate limits on AWS. (5 requests per sec per account). This is problematic when running multiple flink jobs all on same account since each subtasks calls the Describe Stream. Changing this to ListShards will provide more flexibility on rate limits as ListShards has a 100 requests per second per data stream limits. More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8936) Provide only AWS region or endpoint in line with the SDK
Kailash Hassan Dayanand created FLINK-8936: -- Summary: Provide only AWS region or endpoint in line with the SDK Key: FLINK-8936 URL: https://issues.apache.org/jira/browse/FLINK-8936 Project: Flink Issue Type: Improvement Reporter: Kailash Hassan Dayanand Based on recent upgrades to the aws-java-sdk [https://goo.gl/yGLRRG] (in 1.11.79), it is not possible to have both the regions and endpoint specified in the kinesis configuration. The earlier PR which introduced endpoint options FLINK-4197 also had comments about conflicting endpoints and regions but was okay because java sdk did not enforce. Reason for this change: Makes it easier to connect to kinesalite in a development environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis
Kailash Hassan Dayanand created FLINK-: -- Summary: Upgrade AWS SDK in flink-connector-kinesis Key: FLINK- URL: https://issues.apache.org/jira/browse/FLINK- Project: Flink Issue Type: Improvement Reporter: Kailash Hassan Dayanand Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of this version upgrade for KCL and KPL versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)