[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341718#comment-16341718 ] Flavio Pompermaier commented on FLINK-8101: --- Don't worry Christophe, you're PR looks much accurated than mine, I spent just 1 hour trying to implement it and I faced some problem I wa in doubt how to solve. Looking at your PR it seems that you were able to find the right solution. Moreover this shows how much interest there's in this connector ;) The only 2 points I'm still in doubt are: # does it make sense to try to keep compatibility between TransportClient-based versions and RestClinet ones or is it better to start a new base ES connector? # Are the documents sent as plain json or something faster (i.e. compreseed binary)? Are we using the REST client at its best? > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Flavio Pompermaier >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341657#comment-16341657 ] Christophe Jolif commented on FLINK-8101: - Hi Flavio, It happens I was working on the same thing at the same time! Anyway I submitted my PR as well as it was ready. I did not have the time to look at your yet to see what might be different. > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Flavio Pompermaier >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341650#comment-16341650 ] ASF GitHub Bot commented on FLINK-8101: --- GitHub user cjolif opened a pull request: https://github.com/apache/flink/pull/5374 [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (TransportClient) and 6.x (RestHighLevelClient) support ## What is the purpose of the change *The purpose of this PR is to add Elasticsearch 6.X support on top of the RestHighLevelClient. Indeed TransportClient is now deprecated and will be removed in 8.X. Also the hosted version of Elasticsearch often forbid the use of TransportClient.* ## Brief change log * First a set of changes are borrowed from #4675: * Add createRequestIndex method in ElasticsearchApiCallBridge * Add flink-connector-elasticsearch5.3 project * Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert ActionRequest to DocWriteRequest * Then on top of these changes and of being able to create a RestHighLevelClient instead of TransportClient: * Add createClient method in ElasticsearchApiCallBridge. As TransportClient and RestClient have only the AutoCloseable interface in common, this is what the method returns. * Make ElasticsearchSinkBase agnostic to whether it is using a TransportClient or RestClient by adding a createBulkProcessorBuilder method on ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this method on all bridges. * Add flink-connector-elasticsearch6 project leveraging Rest Client while all the other ones still use TransportClient. ## Verifying this change This change added tests and can be verified as follows: * Elasticsearch test base has also been reworked a little bit to leverage it for testing the Rest client base implementation. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): if you use the elasticsearch6 project, this adds dependencies on elasticsearch 6 - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs & javadocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/cjolif/flink es53-es6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5374.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5374 commit b6a2396b31ade29071c65efa72df9f8f1fab9af4 Author: zjureel Date: 2017-09-15T03:51:35Z [FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add createRequestIndex method commit 1e5b21a331dfaed50844e89986c313f5fc40bdbe Author: zjureel Date: 2017-09-15T03:55:16Z [FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and later versions commit 5a6e840c316095dd4f65f559405b19dcda7a1ca0 Author: zjureel Date: 2017-09-15T04:42:44Z [FLINK-7386] add test case for ES53 commit 574818f0f56f6a2b644e271a05a0796d90598aef Author: zjureel Date: 2017-09-15T05:33:43Z [FLINK-7386] add document for ES5.3 commit 14168825507ad98c49a63be8ceab23dc539ff977 Author: Christophe Jolif Date: 2018-01-25T21:31:57Z [FLINK-8101] Elasticsearch 6.X REST support > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Flavio Pompermaier >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...
GitHub user cjolif opened a pull request: https://github.com/apache/flink/pull/5374 [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (TransportClient) and 6.x (RestHighLevelClient) support ## What is the purpose of the change *The purpose of this PR is to add Elasticsearch 6.X support on top of the RestHighLevelClient. Indeed TransportClient is now deprecated and will be removed in 8.X. Also the hosted version of Elasticsearch often forbid the use of TransportClient.* ## Brief change log * First a set of changes are borrowed from #4675: * Add createRequestIndex method in ElasticsearchApiCallBridge * Add flink-connector-elasticsearch5.3 project * Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert ActionRequest to DocWriteRequest * Then on top of these changes and of being able to create a RestHighLevelClient instead of TransportClient: * Add createClient method in ElasticsearchApiCallBridge. As TransportClient and RestClient have only the AutoCloseable interface in common, this is what the method returns. * Make ElasticsearchSinkBase agnostic to whether it is using a TransportClient or RestClient by adding a createBulkProcessorBuilder method on ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this method on all bridges. * Add flink-connector-elasticsearch6 project leveraging Rest Client while all the other ones still use TransportClient. ## Verifying this change This change added tests and can be verified as follows: * Elasticsearch test base has also been reworked a little bit to leverage it for testing the Rest client base implementation. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): if you use the elasticsearch6 project, this adds dependencies on elasticsearch 6 - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs & javadocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/cjolif/flink es53-es6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5374.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5374 commit b6a2396b31ade29071c65efa72df9f8f1fab9af4 Author: zjureel Date: 2017-09-15T03:51:35Z [FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add createRequestIndex method commit 1e5b21a331dfaed50844e89986c313f5fc40bdbe Author: zjureel Date: 2017-09-15T03:55:16Z [FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and later versions commit 5a6e840c316095dd4f65f559405b19dcda7a1ca0 Author: zjureel Date: 2017-09-15T04:42:44Z [FLINK-7386] add test case for ES53 commit 574818f0f56f6a2b644e271a05a0796d90598aef Author: zjureel Date: 2017-09-15T05:33:43Z [FLINK-7386] add document for ES5.3 commit 14168825507ad98c49a63be8ceab23dc539ff977 Author: Christophe Jolif Date: 2018-01-25T21:31:57Z [FLINK-8101] Elasticsearch 6.X REST support ---
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341638#comment-16341638 ] ASF GitHub Bot commented on FLINK-8479: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5342 Very interesting! two things: 1. can you make the google doc publicly viewable? I cannot access it right now 2. how does it handle event time window joins of two streams, where data in one stream always quite late than the other? For example, we are joining stream A and B on a 10 min event-time tumbling window from 12:00 -12:10, 12:10 - 12:20 data in stream B always arrive 30 mins later than the data in stream A. How does the operators handle that? Does it cache A's data until B's data arrives, do the join, and remove A's data from cache? (I haven't read the code in detail, just try to get a general idea of the overall design) > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5342: [FLINK-8479] Timebounded stream join
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5342 Very interesting! two things: 1. can you make the google doc publicly viewable? I cannot access it right now 2. how does it handle event time window joins of two streams, where data in one stream always quite late than the other? For example, we are joining stream A and B on a 10 min event-time tumbling window from 12:00 -12:10, 12:10 - 12:20 data in stream B always arrive 30 mins later than the data in stream A. How does the operators handle that? Does it cache A's data until B's data arrives, do the join, and remove A's data from cache? (I haven't read the code in detail, just try to get a general idea of the overall design) ---
[jira] [Commented] (FLINK-8230) NPE in OrcRowInputFormat on nested structs
[ https://issues.apache.org/jira/browse/FLINK-8230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341536#comment-16341536 ] ASF GitHub Bot commented on FLINK-8230: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/5373 [FLINK-8230] [orc] Fix NPEs when reading nested columns. ## What is the purpose of the change - fixes NPEs for null-valued structs, lists, and maps - fixes NPEs for repeating structs, lists, and maps ## Brief change log - renamed `OrcUtils` to `OrcBatchReader` - reimplement large parts of the `OrcBatchReader` - add tests for nested columns with nulls and repeating values - added a class to generate ORC files for tests ## Verifying this change - added tests to `OrcInputFormatTest` - changes have been verified by a user on a production dataset. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **n/a** You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink orcNPE Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5373 commit 97fd0c7687eff6f8b84516f4d8ab8268cab838b1 Author: Fabian Hueske Date: 2017-12-12T11:09:14Z [FLINK-8230] [orc] Fix NPEs when reading nested columns. - fixes NPEs for null-valued structs, lists, and maps - fixes NPEs for repeating structs, lists, and maps - adds test for deeply nested data with nulls - adds test for columns with repeating values > NPE in OrcRowInputFormat on nested structs > -- > > Key: FLINK-8230 > URL: https://issues.apache.org/jira/browse/FLINK-8230 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > OrcRowInputFormat ignores isNull and isRepeating on nested struct columns. If > a struct column contains nulls, it tries to read struct fields, leading to > NPE in case of string fields: > {code} > java.lang.NullPointerException > at java.lang.String.checkBounds(String.java:384) > at java.lang.String.(String.java:462) > at > org.apache.flink.orc.OrcUtils.readNonNullBytesColumnAsString(OrcUtils.java:392) > at org.apache.flink.orc.OrcUtils.readField(OrcUtils.java:215) > at org.apache.flink.orc.OrcUtils.readStructColumn(OrcUtils.java:1203) > at org.apache.flink.orc.OrcUtils.readField(OrcUtils.java:252) > at > org.apache.flink.orc.OrcUtils.readNonNullStructColumn(OrcUtils.java:677) > at org.apache.flink.orc.OrcUtils.readField(OrcUtils.java:250) > at org.apache.flink.orc.OrcUtils.fillRows(OrcUtils.java:142) > at > org.apache.flink.orc.OrcRowInputFormat.ensureBatch(OrcRowInputFormat.java:334) > at > org.apache.flink.orc.OrcRowInputFormat.reachedEnd(OrcRowInputFormat.java:314) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:165) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5373: [FLINK-8230] [orc] Fix NPEs when reading nested co...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/5373 [FLINK-8230] [orc] Fix NPEs when reading nested columns. ## What is the purpose of the change - fixes NPEs for null-valued structs, lists, and maps - fixes NPEs for repeating structs, lists, and maps ## Brief change log - renamed `OrcUtils` to `OrcBatchReader` - reimplement large parts of the `OrcBatchReader` - add tests for nested columns with nulls and repeating values - added a class to generate ORC files for tests ## Verifying this change - added tests to `OrcInputFormatTest` - changes have been verified by a user on a production dataset. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **n/a** You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink orcNPE Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5373 commit 97fd0c7687eff6f8b84516f4d8ab8268cab838b1 Author: Fabian Hueske Date: 2017-12-12T11:09:14Z [FLINK-8230] [orc] Fix NPEs when reading nested columns. - fixes NPEs for null-valued structs, lists, and maps - fixes NPEs for repeating structs, lists, and maps - adds test for deeply nested data with nulls - adds test for columns with repeating values ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341450#comment-16341450 ] Tzu-Li (Gordon) Tai commented on FLINK-8516: Posting the mailing list discussion thread of the issue here, for easier reference: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Kinesis-consumer-shard-skew-FLINK-8516-td20843.html > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341449#comment-16341449 ] Tzu-Li (Gordon) Tai commented on FLINK-8516: [~thw] done, you can now assign tickets to yourself! I'm also assuming you'd like to work on this issue, so I assigned it for you. > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-8516: -- Assignee: Thomas Weise > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341433#comment-16341433 ] Bowen Li commented on FLINK-3089: - [~srichter] Right, we should implement it in both backends, and also give users a heads-up that TTL's heap implementation would increase their in-memory state size and they should consider it in memory capacity planning. I actually did some research yesterday on how TTL should be implemented in memory. What you described is very similar to [how Redis implemented TTL|https://redis.io/commands/expire#how-redis-expires-keys], and, of course, we need to cater the strategy to Flink. How about this? Let me summarize all the above discussions and write up a google doc, and then we can iterate on the design > State API Should Support Data Expiration (State TTL) > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes >Assignee: Bowen Li >Priority: Major > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific state which I > can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341353#comment-16341353 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { +watermarkStrategy = Some(new AscendingTimestamps) +this + } + + /** +* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded +* time interval. +* +* Emits watermarks which are the maximum observed timestamp minus the specified delay. +*/ + def watermarkPeriodicBounding(delay: Long): Rowtime = { --- End diff -- `periodicBoundedOOOWatermarks()` > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.
***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341357#comment-16341357 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168781 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { --- End diff -- `preserveSourceTimestamps()` > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341345#comment-16341345 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164122214 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/NormalizedProperties.scala --- @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.io.Serializable +import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong} + +import org.apache.commons.codec.binary.Base64 +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{TableSchema, ValidationException} +import org.apache.flink.table.descriptors.DescriptorUtils._ +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTableSchema +import org.apache.flink.table.plan.stats.ColumnStats +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.InstantiationUtil +import org.apache.flink.util.Preconditions.checkNotNull + +import scala.collection.mutable + +/** + * Utility class for having a unified string-based representation of Table API related classes + * such as [[TableSchema]], [[TypeInformation]], [[WatermarkStrategy]], etc. + */ +class NormalizedProperties( --- End diff -- Rename to `TableSourceProperties`? `NormalizedProperties` is quite generic > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341354#comment-16341354 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164166552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { --- End diff -- `periodicAscendingWatermarks()`? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaratio
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341358#comment-16341358 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164169928 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +/** + * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider + * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that + * describe the desired table source. The factory allows for matching to the given set of + * properties and creating a configured [[TableSource]] accordingly. + * + * Classes that implement this interface need to be added to the + * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in + * the current classpath to be found. + */ +trait TableSourceFactory[T] { --- End diff -- We might want to add a method that exposes all properties of the connector and format that the factory supports. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341351#comment-16341351 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168602 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { --- End diff -- `timestampsFromExtractor()` > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design documen
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341355#comment-16341355 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168496 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { +watermarkStrategy = Some(new AscendingTimestamps) +this + } + + /** +* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded +* time interval. +* +* Emits watermarks which are the maximum observed timestamp minus the specified delay. +*/ + def watermarkPeriodicBounding(delay: Long): Rowtime = { +watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay)) +this + } + + /** +* Sets a built-in watermark strategy which indicates the watermarks should be preserved from the +* underlying DataStream API. +*/ + def watermarkFromDataStream(): Rowtime = { +watermarkStrategy = Some(PreserveWatermarks.INSTANCE) +this + } + + /** +* Sets a custom watermark strategy to be used for the rowtime attribute. +*/ + def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = { --- End diff -- `watermarksFromStrategy()` > Create unified interfaces to configure and instatiate TableSources > --
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341350#comment-16341350 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164150350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -69,17 +72,76 @@ class Schema extends Descriptor { */ def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") + throw new ValidationException(s"Duplicate field name $fieldName.") +} + +val fieldProperties = mutable.LinkedHashMap[String, String]() +fieldProperties += (DescriptorUtils.TYPE -> fieldType) + +tableSchema += (fieldName -> fieldProperties) + +lastField = Some(fieldName) +this + } + + /** +* Specifies the origin of the previously defined field. The origin field is defined by a +* connector or format. +* +* E.g. field("myString", Types.STRING).from("CSV_MY_STRING") +*/ + def from(originFieldName: String): Schema = { +lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") --- End diff -- "previously defined" > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341356#comment-16341356 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168933 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { --- End diff -- `timestampsFromField()` > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341352#comment-16341352 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164151474 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -69,17 +72,76 @@ class Schema extends Descriptor { */ def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") + throw new ValidationException(s"Duplicate field name $fieldName.") +} + +val fieldProperties = mutable.LinkedHashMap[String, String]() +fieldProperties += (DescriptorUtils.TYPE -> fieldType) + +tableSchema += (fieldName -> fieldProperties) + +lastField = Some(fieldName) +this + } + + /** +* Specifies the origin of the previously defined field. The origin field is defined by a +* connector or format. +* +* E.g. field("myString", Types.STRING).from("CSV_MY_STRING") +*/ + def from(originFieldName: String): Schema = { +lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") + case Some(f) => +tableSchema(f) += (DescriptorUtils.FROM -> originFieldName) +lastField = None +} +this + } + + /** +* Specifies the previously defined field as a processing-time attribute. +* +* E.g. field("myString", Types.STRING).proctime() +*/ + def proctime(): Schema = { +lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") + case Some(f) => +tableSchema(f) += (DescriptorUtils.PROCTIME -> DescriptorUtils.TRUE) +lastField = None +} +this + } + + /** +* Specifies the previously defined field as an event-time attribute. +* +* E.g. field("myString", Types.STRING).rowtime(...) --- End diff -- `field("procTime", Types.SQL_TIMESTAMP).proctime()` > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341359#comment-16341359 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164149992 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -69,17 +72,76 @@ class Schema extends Descriptor { */ def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") + throw new ValidationException(s"Duplicate field name $fieldName.") +} + +val fieldProperties = mutable.LinkedHashMap[String, String]() +fieldProperties += (DescriptorUtils.TYPE -> fieldType) + +tableSchema += (fieldName -> fieldProperties) + +lastField = Some(fieldName) +this + } + + /** +* Specifies the origin of the previously defined field. The origin field is defined by a +* connector or format. --- End diff -- Add that fields are matched by exact name by default. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341348#comment-16341348 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164126202 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -39,6 +39,9 @@ case class SqlParserException( /** * General Exception for all errors during table handling. + * + * This exception indicates that an internal error occurred or the feature is not fully --- End diff -- "This exception indicates that an internal error occurred or that a feature is not supported yet. Usually, this exception does not indicate a fault of the user." > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341347#comment-16341347 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164151340 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -69,17 +72,76 @@ class Schema extends Descriptor { */ def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") + throw new ValidationException(s"Duplicate field name $fieldName.") +} + +val fieldProperties = mutable.LinkedHashMap[String, String]() +fieldProperties += (DescriptorUtils.TYPE -> fieldType) + +tableSchema += (fieldName -> fieldProperties) + +lastField = Some(fieldName) +this + } + + /** +* Specifies the origin of the previously defined field. The origin field is defined by a +* connector or format. +* +* E.g. field("myString", Types.STRING).from("CSV_MY_STRING") +*/ + def from(originFieldName: String): Schema = { +lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") + case Some(f) => +tableSchema(f) += (DescriptorUtils.FROM -> originFieldName) +lastField = None +} +this + } + + /** +* Specifies the previously defined field as a processing-time attribute. +* +* E.g. field("myString", Types.STRING).proctime() --- End diff -- `field("procTime", Types.SQL_TIMESTAMP).proctime()` > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341349#comment-16341349 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168331 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { +watermarkStrategy = Some(new AscendingTimestamps) +this + } + + /** +* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded +* time interval. +* +* Emits watermarks which are the maximum observed timestamp minus the specified delay. +*/ + def watermarkPeriodicBounding(delay: Long): Rowtime = { +watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay)) +this + } + + /** +* Sets a built-in watermark strategy which indicates the watermarks should be preserved from the +* underlying DataStream API. +*/ + def watermarkFromDataStream(): Rowtime = { --- End diff -- `preserveSourceWatermarks()` `DataStream` is only an internal aspect that's not visible when using table sources. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type:
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341346#comment-16341346 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164131675 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CSV.scala --- @@ -139,26 +139,26 @@ class CSV extends EncodingDescriptor("csv") { } /** -* Internal method for encoding properties conversion. +* Internal method for format properties conversion. */ - override protected def addEncodingProperties(properties: NormalizedProperties): Unit = { -fieldDelim.foreach(properties.putString("field-delimiter", _)) -lineDelim.foreach(properties.putString("line-delimiter", _)) -properties.putTableSchema("fields", encodingSchema.toIndexedSeq) -quoteCharacter.foreach(properties.putCharacter("quote-character", _)) -commentPrefix.foreach(properties.putString("comment-prefix", _)) -isIgnoreFirstLine.foreach(properties.putBoolean("ignore-first-line", _)) -lenient.foreach(properties.putBoolean("ignore-parse-errors", _)) + override protected def addFormatProperties(properties: NormalizedProperties): Unit = { + fieldDelim.foreach(properties.putString(DescriptorUtils.FIELD_DELIMITER, _)) --- End diff -- I would not define the constants globally. Some constants should be global, but constants for specific connectors or formats, should go to the respective descriptor. IMO, it would be better to have these keys in `CSV` or the class that validates the properties of a certain type. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168496 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { +watermarkStrategy = Some(new AscendingTimestamps) +this + } + + /** +* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded +* time interval. +* +* Emits watermarks which are the maximum observed timestamp minus the specified delay. +*/ + def watermarkPeriodicBounding(delay: Long): Rowtime = { +watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay)) +this + } + + /** +* Sets a built-in watermark strategy which indicates the watermarks should be preserved from the +* underlying DataStream API. +*/ + def watermarkFromDataStream(): Rowtime = { +watermarkStrategy = Some(PreserveWatermarks.INSTANCE) +this + } + + /** +* Sets a custom watermark strategy to be used for the rowtime attribute. +*/ + def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = { --- End diff -- `watermarksFromStrategy()` ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164166552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { --- End diff -- `periodicAscendingWatermarks()`? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164151340 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -69,17 +72,76 @@ class Schema extends Descriptor { */ def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") + throw new ValidationException(s"Duplicate field name $fieldName.") +} + +val fieldProperties = mutable.LinkedHashMap[String, String]() +fieldProperties += (DescriptorUtils.TYPE -> fieldType) + +tableSchema += (fieldName -> fieldProperties) + +lastField = Some(fieldName) +this + } + + /** +* Specifies the origin of the previously defined field. The origin field is defined by a +* connector or format. +* +* E.g. field("myString", Types.STRING).from("CSV_MY_STRING") +*/ + def from(originFieldName: String): Schema = { +lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") + case Some(f) => +tableSchema(f) += (DescriptorUtils.FROM -> originFieldName) +lastField = None +} +this + } + + /** +* Specifies the previously defined field as a processing-time attribute. +* +* E.g. field("myString", Types.STRING).proctime() --- End diff -- `field("procTime", Types.SQL_TIMESTAMP).proctime()` ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164126202 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -39,6 +39,9 @@ case class SqlParserException( /** * General Exception for all errors during table handling. + * + * This exception indicates that an internal error occurred or the feature is not fully --- End diff -- "This exception indicates that an internal error occurred or that a feature is not supported yet. Usually, this exception does not indicate a fault of the user." ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168602 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { --- End diff -- `timestampsFromExtractor()` ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168331 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { +watermarkStrategy = Some(new AscendingTimestamps) +this + } + + /** +* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded +* time interval. +* +* Emits watermarks which are the maximum observed timestamp minus the specified delay. +*/ + def watermarkPeriodicBounding(delay: Long): Rowtime = { +watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay)) +this + } + + /** +* Sets a built-in watermark strategy which indicates the watermarks should be preserved from the +* underlying DataStream API. +*/ + def watermarkFromDataStream(): Rowtime = { --- End diff -- `preserveSourceWatermarks()` `DataStream` is only an internal aspect that's not visible when using table sources. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { +watermarkStrategy = Some(new AscendingTimestamps) +this + } + + /** +* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded +* time interval. +* +* Emits watermarks which are the maximum observed timestamp minus the specified delay. +*/ + def watermarkPeriodicBounding(delay: Long): Rowtime = { --- End diff -- `periodicBoundedOOOWatermarks()` ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168781 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { --- End diff -- `preserveSourceTimestamps()` ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164149992 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -69,17 +72,76 @@ class Schema extends Descriptor { */ def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") + throw new ValidationException(s"Duplicate field name $fieldName.") +} + +val fieldProperties = mutable.LinkedHashMap[String, String]() +fieldProperties += (DescriptorUtils.TYPE -> fieldType) + +tableSchema += (fieldName -> fieldProperties) + +lastField = Some(fieldName) +this + } + + /** +* Specifies the origin of the previously defined field. The origin field is defined by a +* connector or format. --- End diff -- Add that fields are matched by exact name by default. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164151474 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -69,17 +72,76 @@ class Schema extends Descriptor { */ def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") + throw new ValidationException(s"Duplicate field name $fieldName.") +} + +val fieldProperties = mutable.LinkedHashMap[String, String]() +fieldProperties += (DescriptorUtils.TYPE -> fieldType) + +tableSchema += (fieldName -> fieldProperties) + +lastField = Some(fieldName) +this + } + + /** +* Specifies the origin of the previously defined field. The origin field is defined by a +* connector or format. +* +* E.g. field("myString", Types.STRING).from("CSV_MY_STRING") +*/ + def from(originFieldName: String): Schema = { +lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") + case Some(f) => +tableSchema(f) += (DescriptorUtils.FROM -> originFieldName) +lastField = None +} +this + } + + /** +* Specifies the previously defined field as a processing-time attribute. +* +* E.g. field("myString", Types.STRING).proctime() +*/ + def proctime(): Schema = { +lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") + case Some(f) => +tableSchema(f) += (DescriptorUtils.PROCTIME -> DescriptorUtils.TRUE) +lastField = None +} +this + } + + /** +* Specifies the previously defined field as an event-time attribute. +* +* E.g. field("myString", Types.STRING).rowtime(...) --- End diff -- `field("procTime", Types.SQL_TIMESTAMP).proctime()` ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164168933 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { --- End diff -- `timestampsFromField()` ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164131675 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CSV.scala --- @@ -139,26 +139,26 @@ class CSV extends EncodingDescriptor("csv") { } /** -* Internal method for encoding properties conversion. +* Internal method for format properties conversion. */ - override protected def addEncodingProperties(properties: NormalizedProperties): Unit = { -fieldDelim.foreach(properties.putString("field-delimiter", _)) -lineDelim.foreach(properties.putString("line-delimiter", _)) -properties.putTableSchema("fields", encodingSchema.toIndexedSeq) -quoteCharacter.foreach(properties.putCharacter("quote-character", _)) -commentPrefix.foreach(properties.putString("comment-prefix", _)) -isIgnoreFirstLine.foreach(properties.putBoolean("ignore-first-line", _)) -lenient.foreach(properties.putBoolean("ignore-parse-errors", _)) + override protected def addFormatProperties(properties: NormalizedProperties): Unit = { + fieldDelim.foreach(properties.putString(DescriptorUtils.FIELD_DELIMITER, _)) --- End diff -- I would not define the constants globally. Some constants should be global, but constants for specific connectors or formats, should go to the respective descriptor. IMO, it would be better to have these keys in `CSV` or the class that validates the properties of a certain type. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164122214 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/NormalizedProperties.scala --- @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.io.Serializable +import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong} + +import org.apache.commons.codec.binary.Base64 +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{TableSchema, ValidationException} +import org.apache.flink.table.descriptors.DescriptorUtils._ +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTableSchema +import org.apache.flink.table.plan.stats.ColumnStats +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.InstantiationUtil +import org.apache.flink.util.Preconditions.checkNotNull + +import scala.collection.mutable + +/** + * Utility class for having a unified string-based representation of Table API related classes + * such as [[TableSchema]], [[TypeInformation]], [[WatermarkStrategy]], etc. + */ +class NormalizedProperties( --- End diff -- Rename to `TableSourceProperties`? `NormalizedProperties` is quite generic ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164169928 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +/** + * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider + * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that + * describe the desired table source. The factory allows for matching to the given set of + * properties and creating a configured [[TableSource]] accordingly. + * + * Classes that implement this interface need to be added to the + * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in + * the current classpath to be found. + */ +trait TableSourceFactory[T] { --- End diff -- We might want to add a method that exposes all properties of the connector and format that the factory supports. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164150350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -69,17 +72,76 @@ class Schema extends Descriptor { */ def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") + throw new ValidationException(s"Duplicate field name $fieldName.") +} + +val fieldProperties = mutable.LinkedHashMap[String, String]() +fieldProperties += (DescriptorUtils.TYPE -> fieldType) + +tableSchema += (fieldName -> fieldProperties) + +lastField = Some(fieldName) +this + } + + /** +* Specifies the origin of the previously defined field. The origin field is defined by a +* connector or format. +* +* E.g. field("myString", Types.STRING).from("CSV_MY_STRING") +*/ + def from(originFieldName: String): Schema = { +lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") --- End diff -- "previously defined" ---
[jira] [Commented] (FLINK-7934) Upgrade Calcite dependency to 1.15
[ https://issues.apache.org/jira/browse/FLINK-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341343#comment-16341343 ] ASF GitHub Bot commented on FLINK-7934: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5355 @suez1224 Great to learn that! > Upgrade Calcite dependency to 1.15 > -- > > Key: FLINK-7934 > URL: https://issues.apache.org/jira/browse/FLINK-7934 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.5.0 > > > Umbrella issue for all related issues for Apache Calcite 1.15 release. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5355: [FLINK-7934][Table & SQL API] Upgrade Flink to use Calcit...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5355 @suez1224 Great to learn that! ---
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341313#comment-16341313 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5240 Hi Timo, the PR looks good overall. I've made a few suggestion mostly about renaming methods or extending docs. I'd also propose to add a `supportedProperties()` method to `TableSourceFactory` that can be used to validate whether the factory supports all user-provided properties of a connector or format. What do you think? Fabian > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5240: [FLINK-8240] [table] Create unified interfaces to configu...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5240 Hi Timo, the PR looks good overall. I've made a few suggestion mostly about renaming methods or extending docs. I'd also propose to add a `supportedProperties()` method to `TableSourceFactory` that can be used to validate whether the factory supports all user-provided properties of a connector or format. What do you think? Fabian ---
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341309#comment-16341309 ] ASF GitHub Bot commented on FLINK-7923: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5367 @twalthr rebased, could you please take another look? Thanks a lot. > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5367 @twalthr rebased, could you please take another look? Thanks a lot. ---
[jira] [Comment Edited] (FLINK-3089) State API Should Support Data Expiration (State TTL)
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341298#comment-16341298 ] Stefan Richter edited comment on FLINK-3089 at 1/26/18 5:13 PM: [~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I would not limit the feature to RocksDB, in fact I am also considering to implement incremental snapshot for the heap backend and have some approach how to this could be done. For TTL on the heap backend, I also have some ideas how this could work for the async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and might become the only implementation eventually). For example, one idea is that we might go for an approach that works similar to the incremental rehash: doing a linear scan over the directory that removes outdated entries over time. This scan is performed in very small steps and driven by other operations, e.g. a small fraction of the buckets (maybe just one) is cleaned up as side activity for every operation on the map to amortize the cleanup costs. With the linear nature, at least those accesses to the bucket array are also cache conscious. Besides, of course we can also drop all outdated entries that we encounter during the operations. In general, outdated entries could be detected by an attached timestamp (introducing more memory overhead per entry), or we could try to correlate timeout with the state version that already exists on every entry in this map and currently define the snapshot epochs. was (Author: srichter): [~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I would not limit the feature to RocksDB, in fact I am also considering to implement incremental snapshot for the heap backend and have some approach how to this could be done. For TTL on the heap backend, I also have some ideas how this could work for the async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and might become the only implementation eventually). For example, one idea is that we might go for an approach that works similar to the incremental rehash: doing a linear scan over the directory that removes outdated entries over time. This scan is performed in very small steps and driven by other operations, e.g. a small fraction of the buckets (maybe just one) is cleaned up as side activity for every operation on the map to amortize the cleanup costs. With the linear nature, at least those accesses to the bucket array are also cache conscious. Besides, of course we can also drop all outdated entries that we encounter during the operations. In general, outdated entries cound be detected by an attached timestamp (introducing more memory overhead per entry), or we could try to correlate timeout with the state version that already exists on every entry in this map. > State API Should Support Data Expiration (State TTL) > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes >Assignee: Bowen Li >Priority: Major > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific state which I > can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-3089) State API Should Support Data Expiration (State TTL)
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341298#comment-16341298 ] Stefan Richter edited comment on FLINK-3089 at 1/26/18 5:12 PM: [~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I would not limit the feature to RocksDB, in fact I am also considering to implement incremental snapshot for the heap backend and have some approach how to this could be done. For TTL on the heap backend, I also have some ideas how this could work for the async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and might become the only implementation eventually). For example, one idea is that we might go for an approach that works similar to the incremental rehash: doing a linear scan over the directory that removes outdated entries over time. This scan is performed in very small steps and driven by other operations, e.g. a small fraction of the buckets (maybe just one) is cleaned up as side activity for every operation on the map to amortize the cleanup costs. With the linear nature, at least those accesses to the bucket array are also cache conscious. Besides, of course we can also drop all outdated entries that we encounter during the operations. In general, outdated entries cound be detected by an attached timestamp (introducing more memory overhead per entry), or we could try to correlate timeout with the state version that already exists on every entry in this map. was (Author: srichter): [~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I would not limit the feature to RocksDB, in fact I am also considering to implement incremental snapshot for the heap backend and have some approach how to this could be done. For TTL on the heap backend, I also have some ideas how this could work for the async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and might become the only implementation eventually). For example, one idea is that we might go for an approach that works similar to the incremental rehash: doing a linear scan over the directory that removes outdated entries over time. This scan is performed in very small steps and driven by other operations, e.g. a small fraction of the buckets (maybe just one) is cleaned up as side activity for every operation on the map to amortize the cleanup costs. With the linear nature, at least those accesses to the bucket array are also cache conscious. In general, outdated entries cound be detected by an attached timestamp (introducing more memory overhead per entry), or we could try to correlate timeout with the state version that already exists on every entry in this map. > State API Should Support Data Expiration (State TTL) > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes >Assignee: Bowen Li >Priority: Major > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific state which I > can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341298#comment-16341298 ] Stefan Richter commented on FLINK-3089: --- [~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I would not limit the feature to RocksDB, in fact I am also considering to implement incremental snapshot for the heap backend and have some approach how to this could be done. For TTL on the heap backend, I also have some ideas how this could work for the async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and might become the only implementation eventually). For example, one idea is that we might go for an approach that works similar to the incremental rehash: doing a linear scan over the directory that removes outdated entries over time. This scan is performed in very small steps and driven by other operations, e.g. a small fraction of the buckets (maybe just one) is cleaned up as side activity for every operation on the map to amortize the cleanup costs. With the linear nature, at least those accesses to the bucket array are also cache conscious. In general, outdated entries cound be detected by an attached timestamp (introducing more memory overhead per entry), or we could try to correlate timeout with the state version that already exists on every entry in this map. > State API Should Support Data Expiration (State TTL) > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes >Assignee: Bowen Li >Priority: Major > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific state which I > can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
***UNCHECKED*** [jira] [Commented] (FLINK-8432) Add openstack swift filesystem
[ https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341246#comment-16341246 ] ASF GitHub Bot commented on FLINK-8432: --- Github user jelmerk commented on the issue: https://github.com/apache/flink/pull/5296 All valid points, thanks for the review! > Add openstack swift filesystem > -- > > Key: FLINK-8432 > URL: https://issues.apache.org/jira/browse/FLINK-8432 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > Labels: features > > At ebay classifieds we started running our infrastructure on top of OpenStack. > The openstack project comes with its own amazon-s3-like filesystem, known as > Swift. It's built for scale and optimized for durability, availability, and > concurrency across the entire data set. Swift is ideal for storing > unstructured data that can grow without bound. > We would really like to be able to use it within flink without Hadoop > dependencies, as a sink or for storing savepoints etc > I've prepared a pull request that adds support for it. It wraps the hadoop > support for swift in a way that is very similar to the way the s3 connector > works. > You can find out more about the underlying hadoop implementation at > [https://hadoop.apache.org/docs/stable/hadoop-openstack/index.html] > Pull request : [https://github.com/apache/flink/pull/5296] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8432) Add openstack swift filesystem
[ https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341240#comment-16341240 ] ASF GitHub Bot commented on FLINK-8432: --- Github user jelmerk commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164158043 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Simple factory for the Swift file system. + */ +public class SwiftFileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class); + + /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */ + private static final String CONFIG_PREFIX = "swift."; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + @Override + public String getScheme() { + return "swift"; + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)"); + + try { + // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) + + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + LOG.debug("Loading Hadoop configuration for swift native file system"); + hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + // hadoop.tmp.dir needs to be defined because it is used as buffer directory + if (hadoopConfig.get("hadoop.tmp.dir") == null) { + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name"); --- End diff -- Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209 > Add openstack swift filesystem > -- > > Key: FLINK-8432 > URL: https://issues.apache.org/jira/browse/FLINK-8432 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > Labels: features > > At ebay classifieds we started running our infrastructure on top of OpenStack. > The openstack project comes with its own amazon-s3-like filesystem, known as > Swift. It's built for scale and optimized for durability, availability, and > concurrency across the entire data set. Swift is ideal for storing > unstructured data that can grow without bound. > We would really like to be able to use it within flink without Hadoop > dependencies, as a sink o
[jira] [Commented] (FLINK-8432) Add openstack swift filesystem
[ https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341245#comment-16341245 ] ASF GitHub Bot commented on FLINK-8432: --- Github user jelmerk commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164158508 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for the Swift file system support. + */ +public class HadoopSwiftFileSystemITCase extends TestLogger { --- End diff -- I did > Add openstack swift filesystem > -- > > Key: FLINK-8432 > URL: https://issues.apache.org/jira/browse/FLINK-8432 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > Labels: features > > At ebay classifieds we started running our infrastructure on top of OpenStack. > The openstack project comes with its own amazon-s3-like filesystem, known as > Swift. It's built for scale and optimized for durability, availability, and > concurrency across the entire data set. Swift is ideal for storing > unstructured data that can grow without bound. > We would really like to be able to use it within flink without Hadoop > dependencies, as a sink or for storing savepoints etc > I've prepared a pull request that adds support for it. It wraps the hadoop > support for swift in a way that is very similar to the way the s3 connector > works. > You can find out more about the underlying hadoop implementation at > [https://hadoop.apache.org/docs/stable/hadoop-openstack/index.html] > Pull request : [https://github.com/apache/flink/pull/5296] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5296: [FLINK-8432] [filesystem-connector] Add support for opens...
Github user jelmerk commented on the issue: https://github.com/apache/flink/pull/5296 All valid points, thanks for the review! ---
[jira] [Commented] (FLINK-8432) Add openstack swift filesystem
[ https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341241#comment-16341241 ] ASF GitHub Bot commented on FLINK-8432: --- Github user jelmerk commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164158160 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Simple factory for the Swift file system. + */ +public class SwiftFileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class); + + /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */ + private static final String CONFIG_PREFIX = "swift."; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + @Override + public String getScheme() { + return "swift"; + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)"); + + try { + // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) + + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + LOG.debug("Loading Hadoop configuration for swift native file system"); + hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + // hadoop.tmp.dir needs to be defined because it is used as buffer directory + if (hadoopConfig.get("hadoop.tmp.dir") == null) { + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name"); + hadoopConfig.set("hadoop.tmp.dir", tmpDir); + } + + // add additional config entries from the Flink config to the Presto Hadoop config + for (String key : flinkConfig.keySet()) { + if (key.startsWith(CONFIG_PREFIX)) { + String value = flinkConfig.getString(key, null); + String newKey = "fs.swift." + key.substring(CONFIG_PREFIX.length()); + hadoopConfig.set(newKey, flinkConfig.getString(key, null)); --- End diff -- Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209 > Add openstack swift filesystem > -- > > Ke
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Github user jelmerk commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164158508 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for the Swift file system support. + */ +public class HadoopSwiftFileSystemITCase extends TestLogger { --- End diff -- I did ---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Github user jelmerk commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164158160 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Simple factory for the Swift file system. + */ +public class SwiftFileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class); + + /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */ + private static final String CONFIG_PREFIX = "swift."; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + @Override + public String getScheme() { + return "swift"; + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)"); + + try { + // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) + + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + LOG.debug("Loading Hadoop configuration for swift native file system"); + hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + // hadoop.tmp.dir needs to be defined because it is used as buffer directory + if (hadoopConfig.get("hadoop.tmp.dir") == null) { + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name"); + hadoopConfig.set("hadoop.tmp.dir", tmpDir); + } + + // add additional config entries from the Flink config to the Presto Hadoop config + for (String key : flinkConfig.keySet()) { + if (key.startsWith(CONFIG_PREFIX)) { + String value = flinkConfig.getString(key, null); + String newKey = "fs.swift." + key.substring(CONFIG_PREFIX.length()); + hadoopConfig.set(newKey, flinkConfig.getString(key, null)); --- End diff -- Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209 ---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Github user jelmerk commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164158043 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Simple factory for the Swift file system. + */ +public class SwiftFileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class); + + /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */ + private static final String CONFIG_PREFIX = "swift."; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + @Override + public String getScheme() { + return "swift"; + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)"); + + try { + // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) + + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + LOG.debug("Loading Hadoop configuration for swift native file system"); + hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + // hadoop.tmp.dir needs to be defined because it is used as buffer directory + if (hadoopConfig.get("hadoop.tmp.dir") == null) { + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name"); --- End diff -- Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209 ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341233#comment-16341233 ] Thomas Weise commented on FLINK-8516: - Can a PMC please add me as contributor, thanks! CC: [~StephanEwen] > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8516: --- Affects Version/s: 1.5.0 1.4.0 1.3.2 > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main
[ https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341194#comment-16341194 ] mingleizhang commented on FLINK-7095: - I will use Commons CLI as it's tool that implements this functionality. [~till.rohrmann] If it it okay, please let me know. Thanks ~ > Add proper command line parsing tool to TaskManagerRunner.main > -- > > Key: FLINK-7095 > URL: https://issues.apache.org/jira/browse/FLINK-7095 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Priority: Minor > Labels: flip-6 > > We need to add a proper command line parsing tool to the entry point of the > {{TaskManagerRunner#main}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341134#comment-16341134 ] Flavio Pompermaier commented on FLINK-8101: --- Hi to all, I've drafted a first version of the Flink ES 6 connector (that is also compatible with ES 5.3+) that I want to discuss with the community. There are a couple of thing to review (I know I still have to properly update the Javadoc...): # How to test the connector? It seems that the embedded Node is not supported anymore (at least to test the REST part)..am I wrong? # Is it possible to make it compatible with the base elasticsearch connector? # Are the http request serialized as JSON or as binary? Is it possible to force binary conversion? > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Flavio Pompermaier >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe
[ https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8450. Resolution: Fixed Fixed via 60b7b03f45aeb5a31202b014e486c40116124b30 > Make JobMaster/DispatcherGateway#requestJob type safe > - > > Key: FLINK-8450 > URL: https://issues.apache.org/jira/browse/FLINK-8450 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{RestfulGateway#requestJob}} returns a > {{CompletableFuture}}. Since {{AccessExecutionGraph}} > is non serializable it could fail if we execute this RPC from a remote > system. In order to make it typesafe we should change its signature to > {{SerializableExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
***UNCHECKED*** [jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe
[ https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341124#comment-16341124 ] ASF GitHub Bot commented on FLINK-8450: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5309 > Make JobMaster/DispatcherGateway#requestJob type safe > - > > Key: FLINK-8450 > URL: https://issues.apache.org/jira/browse/FLINK-8450 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{RestfulGateway#requestJob}} returns a > {{CompletableFuture}}. Since {{AccessExecutionGraph}} > is non serializable it could fail if we execute this RPC from a remote > system. In order to make it typesafe we should change its signature to > {{SerializableExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5311: [FLINK-8454] [flip6] Remove JobExecutionResultCach...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5311 ---
[jira] [Closed] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8449. Resolution: Fixed Fixed via 8f9dbeca8bbb8f74bc17410b2f39903ea1f95af1 > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5312: [FLINK-8344][flip6] Add support for HA to RestClus...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5312 ---
[GitHub] flink pull request #5346: [FLINK-8490] [mesos] Allow custom docker parameter...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5346 ---
[jira] [Closed] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8344. Resolution: Fixed Fixed via ac8225fd56f16b1766724aefbd44babbe322d2ac > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5309 ---
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341123#comment-16341123 ] ASF GitHub Bot commented on FLINK-8449: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5308 > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos
[ https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8490. -- Resolution: Fixed Fix Version/s: 1.5.0 Fixed via 6969fe2fa823be7748cee002a32df02fd1cae09f > Allow custom docker parameters for docker tasks on Mesos > > > Key: FLINK-8490 > URL: https://issues.apache.org/jira/browse/FLINK-8490 > Project: Flink > Issue Type: Improvement > Components: Mesos >Reporter: Jörg Schad >Priority: Major > Fix For: 1.5.0 > > > It would be great to pass custom parameters to Mesos when using the Docker > Containerizer. > This could be similar to this spark example: > `spark.mesos.executor.docker.parameters privileged=true` > > Originally brought up here: > https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5310 ---
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341126#comment-16341126 ] ASF GitHub Bot commented on FLINK-8453: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5310 > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8454) Remove JobExecutionResultCache
[ https://issues.apache.org/jira/browse/FLINK-8454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341125#comment-16341125 ] ASF GitHub Bot commented on FLINK-8454: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5311 > Remove JobExecutionResultCache > -- > > Key: FLINK-8454 > URL: https://issues.apache.org/jira/browse/FLINK-8454 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > With the introduction of the {{SerializableExecutionGraphStore}} to the > {{Dispatcher}}, it is no longer necessary to store the {{JobResult}} in the > {{Dispatcher}}, because all information necessary to derive the {{JobResult}} > is contained in the {{SerializableExecutionGraphStore}}. In order to decrease > complexity, I propose to remove the {{JobExecutionResultCache}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8453. Resolution: Fixed Fixed via 8b817f0f9f0ec55f040b56f2d65c62761eac1ac1 > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos
[ https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341127#comment-16341127 ] ASF GitHub Bot commented on FLINK-8490: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5346 > Allow custom docker parameters for docker tasks on Mesos > > > Key: FLINK-8490 > URL: https://issues.apache.org/jira/browse/FLINK-8490 > Project: Flink > Issue Type: Improvement > Components: Mesos >Reporter: Jörg Schad >Priority: Major > Fix For: 1.5.0 > > > It would be great to pass custom parameters to Mesos when using the Docker > Containerizer. > This could be similar to this spark example: > `spark.mesos.executor.docker.parameters privileged=true` > > Originally brought up here: > https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5308 ---
[GitHub] flink issue #5312: [FLINK-8344][flip6] Add support for HA to RestClusterClie...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5312 Thanks man ð ---
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341128#comment-16341128 ] ASF GitHub Bot commented on FLINK-8344: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5312 > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8454) Remove JobExecutionResultCache
[ https://issues.apache.org/jira/browse/FLINK-8454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8454. Resolution: Fixed Fixed via a6d7f2d72d47b268c0d6ffa402a59a6349c91d95 > Remove JobExecutionResultCache > -- > > Key: FLINK-8454 > URL: https://issues.apache.org/jira/browse/FLINK-8454 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > With the introduction of the {{SerializableExecutionGraphStore}} to the > {{Dispatcher}}, it is no longer necessary to store the {{JobResult}} in the > {{Dispatcher}}, because all information necessary to derive the {{JobResult}} > is contained in the {{SerializableExecutionGraphStore}}. In order to decrease > complexity, I propose to remove the {{JobExecutionResultCache}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Pompermaier reassigned FLINK-8101: - Assignee: Flavio Pompermaier (was: Hai Zhou UTC+8) > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Flavio Pompermaier >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341116#comment-16341116 ] ASF GitHub Bot commented on FLINK-8101: --- GitHub user fpompermaier opened a pull request: https://github.com/apache/flink/pull/5372 [FLINK-8101] [es connector] Elasticsearch 6.x (and 5.3+) Flink connector ## Purpose of the change : implementation of Flink ES connector (5.3+) See https://issues.apache.org/jira/browse/FLINK-8101 and https://issues.apache.org/jira/browse/FLINK-7386 ## Brief change log - Changed "standard" ES connector structor, mainly because there's incompatibility between TransportClient and RestClient and, From ES 5.3+ embedded Node environment is not supported anymore. A running test ES cluster is needed to properly test the code ## Verifying this change - Set up an ES cluster and properly change ES_TEST_HOST, ES_TEST_PORT and CLUSTER_NAME in the tests (or viceversa: set up a localhost ES cluster listening on http port 9200 with cluster name "test-cluster") ## Does this pull request potentially affect one of the following parts: - Flink ES connectors ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? Javadocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/fpompermaier/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5372.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5372 commit eaf878de646d90c5b821e0d3b0964fa311f8ac42 Author: Flavio Pompermaier Date: 2018-01-26T14:26:52Z First draft of ES 6 connector > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341114#comment-16341114 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5310 👍 > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5372: [FLINK-8101] [es connector] Elasticsearch 6.x (and...
GitHub user fpompermaier opened a pull request: https://github.com/apache/flink/pull/5372 [FLINK-8101] [es connector] Elasticsearch 6.x (and 5.3+) Flink connector ## Purpose of the change : implementation of Flink ES connector (5.3+) See https://issues.apache.org/jira/browse/FLINK-8101 and https://issues.apache.org/jira/browse/FLINK-7386 ## Brief change log - Changed "standard" ES connector structor, mainly because there's incompatibility between TransportClient and RestClient and, From ES 5.3+ embedded Node environment is not supported anymore. A running test ES cluster is needed to properly test the code ## Verifying this change - Set up an ES cluster and properly change ES_TEST_HOST, ES_TEST_PORT and CLUSTER_NAME in the tests (or viceversa: set up a localhost ES cluster listening on http port 9200 with cluster name "test-cluster") ## Does this pull request potentially affect one of the following parts: - Flink ES connectors ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? Javadocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/fpompermaier/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5372.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5372 commit eaf878de646d90c5b821e0d3b0964fa311f8ac42 Author: Flavio Pompermaier Date: 2018-01-26T14:26:52Z First draft of ES 6 connector ---
[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5310 ð ---
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341113#comment-16341113 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5312 Thanks man 👍 > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5296: [FLINK-8432] [filesystem-connector] Add support for opens...
Github user etiennecarriere commented on the issue: https://github.com/apache/flink/pull/5296 We tested this PR with flink 1.4 and the swift offered by French Hoster OVH. I was fine for checkpoint . ---
[jira] [Commented] (FLINK-8432) Add openstack swift filesystem
[ https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1634#comment-1634 ] ASF GitHub Bot commented on FLINK-8432: --- Github user etiennecarriere commented on the issue: https://github.com/apache/flink/pull/5296 We tested this PR with flink 1.4 and the swift offered by French Hoster OVH. I was fine for checkpoint . > Add openstack swift filesystem > -- > > Key: FLINK-8432 > URL: https://issues.apache.org/jira/browse/FLINK-8432 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > Labels: features > > At ebay classifieds we started running our infrastructure on top of OpenStack. > The openstack project comes with its own amazon-s3-like filesystem, known as > Swift. It's built for scale and optimized for durability, availability, and > concurrency across the entire data set. Swift is ideal for storing > unstructured data that can grow without bound. > We would really like to be able to use it within flink without Hadoop > dependencies, as a sink or for storing savepoints etc > I've prepared a pull request that adds support for it. It wraps the hadoop > support for swift in a way that is very similar to the way the s3 connector > works. > You can find out more about the underlying hadoop implementation at > [https://hadoop.apache.org/docs/stable/hadoop-openstack/index.html] > Pull request : [https://github.com/apache/flink/pull/5296] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8432) Add openstack swift filesystem
[ https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341099#comment-16341099 ] ASF GitHub Bot commented on FLINK-8432: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164116058 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Simple factory for the Swift file system. + */ +public class SwiftFileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class); + + /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */ + private static final String CONFIG_PREFIX = "swift."; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + @Override + public String getScheme() { + return "swift"; + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)"); + + try { + // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) + + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + LOG.debug("Loading Hadoop configuration for swift native file system"); + hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + // hadoop.tmp.dir needs to be defined because it is used as buffer directory + if (hadoopConfig.get("hadoop.tmp.dir") == null) { + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name"); + hadoopConfig.set("hadoop.tmp.dir", tmpDir); + } + + // add additional config entries from the Flink config to the Presto Hadoop config + for (String key : flinkConfig.keySet()) { + if (key.startsWith(CONFIG_PREFIX)) { + String value = flinkConfig.getString(key, null); + String newKey = "fs.swift." + key.substring(CONFIG_PREFIX.length()); + hadoopConfig.set(newKey, flinkConfig.getString(key, null)); --- End diff -- `flinkConfig.getString(key, null)` can be replaced by `value`. > Add openstack swift filesystem > -- > >
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164116058 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Simple factory for the Swift file system. + */ +public class SwiftFileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class); + + /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */ + private static final String CONFIG_PREFIX = "swift."; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + @Override + public String getScheme() { + return "swift"; + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)"); + + try { + // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) + + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + LOG.debug("Loading Hadoop configuration for swift native file system"); + hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + // hadoop.tmp.dir needs to be defined because it is used as buffer directory + if (hadoopConfig.get("hadoop.tmp.dir") == null) { + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name"); + hadoopConfig.set("hadoop.tmp.dir", tmpDir); + } + + // add additional config entries from the Flink config to the Presto Hadoop config + for (String key : flinkConfig.keySet()) { + if (key.startsWith(CONFIG_PREFIX)) { + String value = flinkConfig.getString(key, null); + String newKey = "fs.swift." + key.substring(CONFIG_PREFIX.length()); + hadoopConfig.set(newKey, flinkConfig.getString(key, null)); --- End diff -- `flinkConfig.getString(key, null)` can be replaced by `value`. ---
[jira] [Commented] (FLINK-8432) Add openstack swift filesystem
[ https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341098#comment-16341098 ] ASF GitHub Bot commented on FLINK-8432: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164119372 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for the Swift file system support. + */ +public class HadoopSwiftFileSystemITCase extends TestLogger { --- End diff -- I assume yo run these tests locally on OpenStack, right? > Add openstack swift filesystem > -- > > Key: FLINK-8432 > URL: https://issues.apache.org/jira/browse/FLINK-8432 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > Labels: features > > At ebay classifieds we started running our infrastructure on top of OpenStack. > The openstack project comes with its own amazon-s3-like filesystem, known as > Swift. It's built for scale and optimized for durability, availability, and > concurrency across the entire data set. Swift is ideal for storing > unstructured data that can grow without bound. > We would really like to be able to use it within flink without Hadoop > dependencies, as a sink or for storing savepoints etc > I've prepared a pull request that adds support for it. It wraps the hadoop > support for swift in a way that is very similar to the way the s3 connector > works. > You can find out more about the underlying hadoop implementation at > [https://hadoop.apache.org/docs/stable/hadoop-openstack/index.html] > Pull request : [https://github.com/apache/flink/pull/5296] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164119372 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for the Swift file system support. + */ +public class HadoopSwiftFileSystemITCase extends TestLogger { --- End diff -- I assume yo run these tests locally on OpenStack, right? ---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164116387 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Simple factory for the Swift file system. + */ +public class SwiftFileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class); + + /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */ + private static final String CONFIG_PREFIX = "swift."; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + @Override + public String getScheme() { + return "swift"; + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)"); + + try { + // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) + + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + LOG.debug("Loading Hadoop configuration for swift native file system"); + hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + // hadoop.tmp.dir needs to be defined because it is used as buffer directory + if (hadoopConfig.get("hadoop.tmp.dir") == null) { + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name"); --- End diff -- Let's use `CoreOptions#TMP_DIRS` instead of using directly `System.getProperty("java.io.tmpdir")`. That way we will use the Flink configured tmp directory. ---
[jira] [Commented] (FLINK-8432) Add openstack swift filesystem
[ https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341100#comment-16341100 ] ASF GitHub Bot commented on FLINK-8432: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5296#discussion_r164116387 --- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.openstackhadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Simple factory for the Swift file system. + */ +public class SwiftFileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class); + + /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */ + private static final String CONFIG_PREFIX = "swift."; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + @Override + public String getScheme() { + return "swift"; + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)"); + + try { + // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) + + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + LOG.debug("Loading Hadoop configuration for swift native file system"); + hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + // hadoop.tmp.dir needs to be defined because it is used as buffer directory + if (hadoopConfig.get("hadoop.tmp.dir") == null) { + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name"); --- End diff -- Let's use `CoreOptions#TMP_DIRS` instead of using directly `System.getProperty("java.io.tmpdir")`. That way we will use the Flink configured tmp directory. > Add openstack swift filesystem > -- > > Key: FLINK-8432 > URL: https://issues.apache.org/jira/browse/FLINK-8432 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > Labels: features > > At ebay classifieds we started running our infrastructure on top of OpenStack. > The openstack project comes with its own amazon-s3-like filesystem, known as > Swift. It's built for scale and optimized for durability, availability, and > concurrency across the entire data set. Swift is ideal for storing > unstructured data that can grow with
[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden
[ https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341053#comment-16341053 ] ASF GitHub Bot commented on FLINK-8407: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5369 Sure. I'll update the PR to make it more appropriate and thanks for your review, @aljoscha. > Setting the parallelism after a partitioning operation should be forbidden > -- > > Key: FLINK-8407 > URL: https://issues.apache.org/jira/browse/FLINK-8407 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Major > > Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} > create new {{DataStreams}}, which allow the users to set parallelisms for > them. However, the {{PartitionTransformations}} in these returned > {{DataStreams}} will only add virtual nodes, whose parallelisms could not be > specified, in the execution graph. We should forbid users to set the > parallelism after a partitioning operation since they won't actually work. > Also the corresponding documents should be updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5369 Sure. I'll update the PR to make it more appropriate and thanks for your review, @aljoscha. ---
***UNCHECKED*** [jira] [Commented] (FLINK-8492) [FLINK-8492][table] Fix calc cost bug
[ https://issues.apache.org/jira/browse/FLINK-8492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341033#comment-16341033 ] ASF GitHub Bot commented on FLINK-8492: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5347 Thanks for the fix @hequn8128! PR is good to merge. > [FLINK-8492][table] Fix calc cost bug > - > > Key: FLINK-8492 > URL: https://issues.apache.org/jira/browse/FLINK-8492 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Considering the following test, unsupported exception will be thrown due to > multi calc existing between correlate and TableFunctionScan. > {code:java} > // code placeholder > @Test > def testCrossJoinWithMultiFilter(): Unit = { > val t = testData(env).toTable(tEnv).as('a, 'b, 'c) > val func0 = new TableFunc0 > val result = t > .join(func0('c) as('d, 'e)) > .select('c, 'd, 'e) > .where('e > 10) > .where('e > 20) > .select('c, 'd) > .toAppendStream[Row] > result.addSink(new StreamITCase.StringSink[Row]) > env.execute() > val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44") > assertEquals(expected.sorted, StreamITCase.testResults.sorted) > } > {code} > I can see two options to fix this problem: > # Adapt calcite OptRule to merge the continuous calc. > # Merge multi calc in correlate convert rule. > I prefer the second one, not only it is easy to implement but also i think > with or without an optimize rule should not influence flink functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5347 Thanks for the fix @hequn8128! PR is good to merge. ---