[GitHub] flink pull request #6352: [FLINK-8163][yarn][tests] Harden tests against slo...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6352 [FLINK-8163][yarn][tests] Harden tests against slow job shutdowns ## What is the purpose of the change This PR hardens the `YarnTestBase` against jobs that just don't want to shut down that quickly (i.e. within 500ms). The maximum waiting time has been increase to 10 seconds, during which we periodically check the state of all applications. Additionally, the failure condition from `@Before` was moved to the `@After` method. This change will allow us to better differentiate between simple timing issues and unsuccessful job shutdowns. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8163 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6352.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6352 commit 0dd65378f0c9f477bb8f5712bbc0b1f31440f5f0 Author: zentol Date: 2018-07-17T11:29:16Z [FLINK-8163][yarn][tests] Harden tests against slow job shutdowns ---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202989785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( --- End diff -- I don't see an alternative if we don't want to have an ugly API. ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546466#comment-16546466 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202989785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( --- End diff -- I don't see an alternative if we don't want to have an ugly API. > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546453#comment-16546453 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202986793 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException} +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( +private val tableEnv: BatchTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { --- End diff -- Depends on the language you are using. This method is public in Java ;) But I will move it down. > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202986793 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException} +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( +private val tableEnv: BatchTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { --- End diff -- Depends on the language you are using. This method is public in Java ;) But I will move it down. ---
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546448#comment-16546448 ] ASF GitHub Bot commented on FLINK-9692: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6300 Nice feature, thanks a lot. Merged this into the 1.6 and 1.7 branches > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > Fix For: 1.6.0 > > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-9692. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in - 1.6.0 via 8005a2ebac1afbec6fcf43a4442f51f442f33590 - 1.7.0 via 01378d06215d6e00b78c4ed40b4b8a2c5c9db129 > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > Fix For: 1.6.0 > > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-9692. --- > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > Fix For: 1.6.0 > > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6300 Nice feature, thanks a lot. Merged this into the 1.6 and 1.7 branches ---
[jira] [Commented] (FLINK-9815) YARNSessionCapacitySchedulerITCase flaky
[ https://issues.apache.org/jira/browse/FLINK-9815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546440#comment-16546440 ] Chesnay Schepler commented on FLINK-9815: - The {{YarnTestBase}} has an {{@After}} method in which it sleeps for 500ms to wait for jobs to terminate. I'll replace this with a more lenient and proper check that fails if any application is still running after several seconds. > YARNSessionCapacitySchedulerITCase flaky > > > Key: FLINK-9815 > URL: https://issues.apache.org/jira/browse/FLINK-9815 > Project: Flink > Issue Type: Bug >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.6.0 > > > The test fails because of dangling yarn applications. > Logs: https://api.travis-ci.org/v3/job/402657694/log.txt > It was also reported previously in [FLINK-8161] : > https://issues.apache.org/jira/browse/FLINK-8161?focusedCommentId=16480216&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16480216 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546435#comment-16546435 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202983790 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException} +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( +private val tableEnv: BatchTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.addProperties(properties) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSource(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSource = TableFactoryService + .find(classOf[BatchTableSourceFactory[_]], javaMap) + .createBatchTableSource(javaMap) +tableEnv.registerTableSource(name, tableSource) + } + + /** +* Searches for the specified table sink, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSink(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSink = TableFactoryService + .find(classOf[BatchTableSinkFactory[_]], javaMap) + .createBatchTableSink(javaMap) +tableEnv.registerTableSink(name, tableSink) + } + + /** +* Searches for the specified table source and sink, configures them accordingly, and registers +* them as a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSourceAndSink(name: String): Unit = { +registerTableSource(name) +registerTableSink(name) + } + + /** +* Specifies the format that defines how to read data from a connector. +*/ + override def withFormat(format: FormatDescriptor): BatchTableDescriptor = { +formatDescriptor = Some(format) +this + } + + /** +* Specifies the resulting table schema. +*/ + override def withSchema(schema: Schema): BatchTableDescriptor = { +schemaDescriptor = Some(schema) +this + } + + override def toString: String = { +getValidProperties.toString + } + + // -- + + private def getValidProperties: DescriptorProperties = { --- End diff -- I agree but I cannot move it up the class hierarchy because it would be public in Java. I
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202983790 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException} +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( +private val tableEnv: BatchTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.addProperties(properties) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSource(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSource = TableFactoryService + .find(classOf[BatchTableSourceFactory[_]], javaMap) + .createBatchTableSource(javaMap) +tableEnv.registerTableSource(name, tableSource) + } + + /** +* Searches for the specified table sink, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSink(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSink = TableFactoryService + .find(classOf[BatchTableSinkFactory[_]], javaMap) + .createBatchTableSink(javaMap) +tableEnv.registerTableSink(name, tableSink) + } + + /** +* Searches for the specified table source and sink, configures them accordingly, and registers +* them as a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSourceAndSink(name: String): Unit = { +registerTableSource(name) +registerTableSink(name) + } + + /** +* Specifies the format that defines how to read data from a connector. +*/ + override def withFormat(format: FormatDescriptor): BatchTableDescriptor = { +formatDescriptor = Some(format) +this + } + + /** +* Specifies the resulting table schema. +*/ + override def withSchema(schema: Schema): BatchTableDescriptor = { +schemaDescriptor = Some(schema) +this + } + + override def toString: String = { +getValidProperties.toString + } + + // -- + + private def getValidProperties: DescriptorProperties = { --- End diff -- I agree but I cannot move it up the class hierarchy because it would be public in Java. I will create a util class. ---
[jira] [Commented] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis
[ https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546433#comment-16546433 ] Chesnay Schepler commented on FLINK-8163: - I run into the memory issue consistently after ~100 iterations. > NonHAQueryableStateFsBackendITCase test getting stuck on Travis > --- > > Key: FLINK-8163 > URL: https://issues.apache.org/jira/browse/FLINK-8163 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis > producing no output for 300s. > https://travis-ci.org/tillrohrmann/flink/jobs/307988209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546430#comment-16546430 ] ASF GitHub Bot commented on FLINK-9692: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6300 > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9777) YARN: JM and TM Memory must be specified with Units
[ https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546425#comment-16546425 ] ASF GitHub Bot commented on FLINK-9777: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6297 @dawidwys added test case, please review~ > YARN: JM and TM Memory must be specified with Units > > > Key: FLINK-9777 > URL: https://issues.apache.org/jira/browse/FLINK-9777 > Project: Flink > Issue Type: Bug > Components: Documentation, YARN >Affects Versions: 1.6.0 > Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0 > > > FLINK-6469 breaks backwards compatibility because the JobManager and > TaskManager memory must be specified with units (otherwise bytes are > assumed). The command to start a YARN session as documented > ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) > > |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] > would not work because 1024 bytes and 4096 bytes are not enough for the heap > size. The command finishes with the following exception: > {noformat} > java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Couldn't deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > ... 2 more > Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum > memory requirements with the provided cluster specification. Please increase > the memory of the cluster. > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) > ... 7 more > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450) > ... 9 more > {noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6300 ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546424#comment-16546424 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202982848 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( +private val tableEnv: StreamTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor + with StreamableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + private var updateMode: Option[String] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.addProperties(properties) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) +updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSource(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSource = TableFactoryService + .find(classOf[StreamTableSourceFactory[_]], javaMap) + .createStreamTableSource(javaMap) +tableEnv.registerTableSource(name, tableSource) + } + + /** +* Searches for the specified table sink, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSink(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSink = TableFactoryService + .find(classOf[StreamTableSinkFactory[_]], javaMap) + .createStreamTableSink(javaMap) +tableEnv.registerTableSink(name, tableSink) + } + + /** +* Searches for the specified table source and sink, configures them accordingly, and registers +* them as a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSourceAndSink(name: String): Unit = { +registerTableSource(name) +registerTableSink(name) + } + + /** +* Specifies the format that defines how to read data from a connector. +*/ + override def withFormat(format: FormatDescriptor): StreamTableDescriptor = { +formatDescriptor = Some(format) +this + } + + /** +* Specifies the resulting table schema. +*/ + override def withSchema(schema: Schema): StreamTableDescriptor = { +schemaDescriptor = Some(schema) +this + } + + /** +* Declares how to perform the conversion between a dynamic table and an external connector. +* +
[GitHub] flink pull request #6347: [hotfix] consistency: vertexes -> vertices
Github user tison1 closed the pull request at: https://github.com/apache/flink/pull/6347 ---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202982848 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( +private val tableEnv: StreamTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor + with StreamableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + private var updateMode: Option[String] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.addProperties(properties) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) +updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSource(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSource = TableFactoryService + .find(classOf[StreamTableSourceFactory[_]], javaMap) + .createStreamTableSource(javaMap) +tableEnv.registerTableSource(name, tableSource) + } + + /** +* Searches for the specified table sink, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSink(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSink = TableFactoryService + .find(classOf[StreamTableSinkFactory[_]], javaMap) + .createStreamTableSink(javaMap) +tableEnv.registerTableSink(name, tableSink) + } + + /** +* Searches for the specified table source and sink, configures them accordingly, and registers +* them as a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSourceAndSink(name: String): Unit = { +registerTableSource(name) +registerTableSink(name) + } + + /** +* Specifies the format that defines how to read data from a connector. +*/ + override def withFormat(format: FormatDescriptor): StreamTableDescriptor = { +formatDescriptor = Some(format) +this + } + + /** +* Specifies the resulting table schema. +*/ + override def withSchema(schema: Schema): StreamTableDescriptor = { +schemaDescriptor = Some(schema) +this + } + + /** +* Declares how to perform the conversion between a dynamic table and an external connector. +* +* In append mode, a dynamic table and an external connector only exchange INSERT messages. +* +* @see See also [[inRetractMode()]] and [[inUpsertMode()]]. +*/ + override def inAppendMode(): StreamTableDescriptor = { +
[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6297 @dawidwys added test case, please review~ ---
[GitHub] flink pull request #6347: [hotfix] consistency: vertexes -> vertices
Github user tison1 closed the pull request at: https://github.com/apache/flink/pull/6347 ---
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user tison1 commented on the issue: https://github.com/apache/flink/pull/6347 > vertices is the correct plural, but this is another one of those cases where fixing it might cause more harm than good since it could cause merge conflicts, yet provides no functional benefit. ... close as @zentol suggested ---
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user tison1 commented on the issue: https://github.com/apache/flink/pull/6347 > vertices is the correct plural, but this is another one of those cases where fixing it might cause more harm than good since it could cause merge conflicts, yet provides no functional benefit. ... close as @zentol suggested ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546423#comment-16546423 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202982499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala --- @@ -19,14 +19,17 @@ package org.apache.flink.table.descriptors /** - * Common class for all descriptors describing a table sink. + * A trait for descriptors that allow to define a format and schema. */ -abstract class TableSinkDescriptor extends TableDescriptor { +trait SchematicDescriptor extends Descriptor { --- End diff -- `SchematicDescriptor` is used for `ExternalCatalogTable`, `StreamTableDescriptor`, and `BatchTableDescriptor`. If we add a new level next to `connector`, `format` (which may happens in the future), we would immediately get a compile error there. > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202982499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala --- @@ -19,14 +19,17 @@ package org.apache.flink.table.descriptors /** - * Common class for all descriptors describing a table sink. + * A trait for descriptors that allow to define a format and schema. */ -abstract class TableSinkDescriptor extends TableDescriptor { +trait SchematicDescriptor extends Descriptor { --- End diff -- `SchematicDescriptor` is used for `ExternalCatalogTable`, `StreamTableDescriptor`, and `BatchTableDescriptor`. If we add a new level next to `connector`, `format` (which may happens in the future), we would immediately get a compile error there. ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546419#comment-16546419 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202981771 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSource(externalCatalogTable, javaMap, statistics) + + // check for a stream table source in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSource(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table source.") +} + +val sink: Option[TableSinkTable[T2]] = tableEnv match { + // check for a batch table sink in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSink(externalCatalogTable, javaMap, statistics) + + // check for a stream table sink in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSink(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") +} + +new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createBatchTableSource[T]( --- End diff -- Then we would have 4 factories that have to be checked with if/else branches. Having those if/else at 3 places (SQL Client, external catalog, and descriptors) is acceptable in my opinion. > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202981771 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSource(externalCatalogTable, javaMap, statistics) + + // check for a stream table source in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSource(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table source.") +} + +val sink: Option[TableSinkTable[T2]] = tableEnv match { + // check for a batch table sink in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSink(externalCatalogTable, javaMap, statistics) + + // check for a stream table sink in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSink(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") +} + +new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createBatchTableSource[T]( --- End diff -- Then we would have 4 factories that have to be checked with if/else branches. Having those if/else at 3 places (SQL Client, external catalog, and descriptors) is acceptable in my opinion. ---
[jira] [Assigned] (FLINK-9815) YARNSessionCapacitySchedulerITCase flaky
[ https://issues.apache.org/jira/browse/FLINK-9815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-9815: --- Assignee: Chesnay Schepler > YARNSessionCapacitySchedulerITCase flaky > > > Key: FLINK-9815 > URL: https://issues.apache.org/jira/browse/FLINK-9815 > Project: Flink > Issue Type: Bug >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.6.0 > > > The test fails because of dangling yarn applications. > Logs: https://api.travis-ci.org/v3/job/402657694/log.txt > It was also reported previously in [FLINK-8161] : > https://issues.apache.org/jira/browse/FLINK-8161?focusedCommentId=16480216&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16480216 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546411#comment-16546411 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202980228 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSource(externalCatalogTable, javaMap, statistics) + + // check for a stream table source in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSource(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table source.") +} + +val sink: Option[TableSinkTable[T2]] = tableEnv match { + // check for a batch table sink in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSink(externalCatalogTable, javaMap, statistics) + + // check for a stream table sink in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSink(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") +} + +new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createBatchTableSource[T]( + externalCatalogTable: ExternalCatalogTable, + javaMap: util.Map[String, String], + statistics: FlinkStatistic) +: Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) { --- End diff -- I also like this pattern more but it is not very Scala-like: https://tpolecat.github.io/2014/05/09/return.html > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Proj
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202980228 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSource(externalCatalogTable, javaMap, statistics) + + // check for a stream table source in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSource(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table source.") +} + +val sink: Option[TableSinkTable[T2]] = tableEnv match { + // check for a batch table sink in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSink(externalCatalogTable, javaMap, statistics) + + // check for a stream table sink in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSink(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") +} + +new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createBatchTableSource[T]( + externalCatalogTable: ExternalCatalogTable, + javaMap: util.Map[String, String], + statistics: FlinkStatistic) +: Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) { --- End diff -- I also like this pattern more but it is not very Scala-like: https://tpolecat.github.io/2014/05/09/return.html ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546407#comment-16546407 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202978881 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() --- End diff -- I also thought about that but actually descriptors don't "build" something. The only final representation would be the properties but we don't expose them to the user. > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202978881 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() --- End diff -- I also thought about that but actually descriptors don't "build" something. The only final representation would be the properties but we don't expose them to the user. ---
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6347 yes ---
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user tison1 commented on the issue: https://github.com/apache/flink/pull/6347 > Additionally this PR makes a lot of whitespace changes that should be reverted in any case. did you mean the whitespace in comment `* ` is significant? ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546399#comment-16546399 ] ASF GitHub Bot commented on FLINK-9852: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202977464 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() + * }}} + * + * Note: For backwards-compatibility, the table is declared as a table source for batch and + * streaming environment by default. + * + * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how + * to target suitable factories. + * + * @param connectorDescriptor describes the system to connect to */ -class ExternalCatalogTable( -connectorDesc: ConnectorDescriptor, -formatDesc: Option[FormatDescriptor], -schemaDesc: Option[Schema], -statisticsDesc: Option[Statistics], -metadataDesc: Option[Metadata]) - extends TableSourceDescriptor { - - this.connectorDescriptor = Some(connectorDesc) - this.formatDescriptor = formatDesc - this.schemaDescriptor = schemaDesc - this.statisticsDescriptor = statisticsDesc - this.metaDescriptor = metadataDesc - - // expose statistics for external table source util - override def getTableStats: Option[TableStats] = super.getTableStats +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor --- End diff -- https://github.com/databricks/scala-style-guide#indent > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202977464 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() + * }}} + * + * Note: For backwards-compatibility, the table is declared as a table source for batch and + * streaming environment by default. + * + * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how + * to target suitable factories. + * + * @param connectorDescriptor describes the system to connect to */ -class ExternalCatalogTable( -connectorDesc: ConnectorDescriptor, -formatDesc: Option[FormatDescriptor], -schemaDesc: Option[Schema], -statisticsDesc: Option[Statistics], -metadataDesc: Option[Metadata]) - extends TableSourceDescriptor { - - this.connectorDescriptor = Some(connectorDesc) - this.formatDescriptor = formatDesc - this.schemaDescriptor = schemaDesc - this.statisticsDescriptor = statisticsDesc - this.metaDescriptor = metadataDesc - - // expose statistics for external table source util - override def getTableStats: Option[TableStats] = super.getTableStats +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor --- End diff -- https://github.com/databricks/scala-style-guide#indent ---
[jira] [Commented] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis
[ https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546392#comment-16546392 ] Chesnay Schepler commented on FLINK-8163: - This test has some funky retrying logic that ignores most exceptions: {code:java} CompletableFuture expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor); expected.whenCompleteAsync((result, throwable) -> { if (throwable != null) { if ( throwable.getCause() instanceof CancellationException || throwable.getCause() instanceof AssertionError || (failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException) ) { resultFuture.completeExceptionally(throwable.getCause()); } else if (deadline.hasTimeLeft()) { getKvStateIgnoringCertainExceptions( deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo, stateDescriptor, failForUnknownKeyOrNamespace, executor); } } else { resultFuture.complete(result); } }, executor);{code} When running the test locally in a loop this exception was logged on the server. {code} org.apache.flink.shaded.netty4.io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 2147483648) {code} The client ignores this error, infinitely retries the operation, causing the timeout. Incidentally, on every subsequent attempt the same exception is printed. > NonHAQueryableStateFsBackendITCase test getting stuck on Travis > --- > > Key: FLINK-8163 > URL: https://issues.apache.org/jira/browse/FLINK-8163 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis > producing no output for 300s. > https://travis-ci.org/tillrohrmann/flink/jobs/307988209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis
[ https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8163. --- Resolution: Cannot Reproduce This test has some funky retrying logic that ignores > NonHAQueryableStateFsBackendITCase test getting stuck on Travis > --- > > Key: FLINK-8163 > URL: https://issues.apache.org/jira/browse/FLINK-8163 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis > producing no output for 300s. > https://travis-ci.org/tillrohrmann/flink/jobs/307988209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis
[ https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-8163: - > NonHAQueryableStateFsBackendITCase test getting stuck on Travis > --- > > Key: FLINK-8163 > URL: https://issues.apache.org/jira/browse/FLINK-8163 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis > producing no output for 300s. > https://travis-ci.org/tillrohrmann/flink/jobs/307988209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9792) Cannot add html tags in options description
[ https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546369#comment-16546369 ] ASF GitHub Bot commented on FLINK-9792: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r202970121 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; + + private static String escapeCharacters(String value) { + return value + .replaceAll("%s", TEMPORARY_PLACEHOLDER) --- End diff -- can you give me an example for where this is problematic? Does this occur if the _final formatted_ description contains `%`? > Cannot add html tags in options description > --- > > Key: FLINK-9792 > URL: https://issues.apache.org/jira/browse/FLINK-9792 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.0, 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > Right now it is impossible to add any html tags in options description, > because all "<" and ">" are escaped. Therefore some links there do not work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9792) Cannot add html tags in options description
[ https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546368#comment-16546368 ] ASF GitHub Bot commented on FLINK-9792: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r202969620 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; --- End diff -- this should be distinct from the placeholder in `Utils`. > Cannot add html tags in options description > --- > > Key: FLINK-9792 > URL: https://issues.apache.org/jira/browse/FLINK-9792 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.0, 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > Right now it is impossible to add any html tags in options description, > because all "<" and ">" are escaped. Therefore some links there do not work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r202970121 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; + + private static String escapeCharacters(String value) { + return value + .replaceAll("%s", TEMPORARY_PLACEHOLDER) --- End diff -- can you give me an example for where this is problematic? Does this occur if the _final formatted_ description contains `%`? ---
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r202969620 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; --- End diff -- this should be distinct from the placeholder in `Utils`. ---
[jira] [Updated] (FLINK-9873) Log actual state when aborting checkpoint due to task not running
[ https://issues.apache.org/jira/browse/FLINK-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9873: Affects Version/s: (was: 1.5.1) 1.5.0 > Log actual state when aborting checkpoint due to task not running > - > > Key: FLINK-9873 > URL: https://issues.apache.org/jira/browse/FLINK-9873 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > Currently, if a checkpoint is triggered while a task s not in a RUNNING state > the following message is logged: > {code:java} > Checkpoint triggering task {} of job {} is not being executed at the > moment.{code} > We can improve this message to include the actual task state to help diagnose > problems. > This message is also a bit ambiguous, as "being executed" could mean many > things, from not "RUNNING", to not being "DEPLOYED", or to not existing at > all. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9792) Cannot add html tags in options description
[ https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9792: Affects Version/s: (was: 1.5.1) 1.5.0 > Cannot add html tags in options description > --- > > Key: FLINK-9792 > URL: https://issues.apache.org/jira/browse/FLINK-9792 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.0, 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > Right now it is impossible to add any html tags in options description, > because all "<" and ">" are escaped. Therefore some links there do not work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config
[ https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546325#comment-16546325 ] ASF GitHub Bot commented on FLINK-9013: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6294#discussion_r202968456 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/TextElement.java --- @@ -53,6 +55,16 @@ public static TextElement text(String text) { return new TextElement(text, Collections.emptyList()); } + /** +* Tries to format the text as code. +* +* @return text element with applied formatting +*/ + public TextElement formatAsCode() { --- End diff -- alternatively we could add an explicit `Code` `InlineElement`. > Document yarn.containers.vcores only being effective when adapting YARN config > -- > > Key: FLINK-9013 > URL: https://issues.apache.org/jira/browse/FLINK-9013 > Project: Flink > Issue Type: Improvement > Components: Documentation, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > Even after specifying {{yarn.containers.vcores}} and having Flink request > such a container from YARN, it may not take these into account at all and > return a container with 1 vcore. > The YARN configuration needs to be adapted to take the vcores into account, > e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}: > {code} > > yarn.resourcemanager.scheduler.class > > org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler > > {code} > This fact should be documented at least at the configuration parameter > documentation of {{yarn.containers.vcores}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6294#discussion_r202968456 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/TextElement.java --- @@ -53,6 +55,16 @@ public static TextElement text(String text) { return new TextElement(text, Collections.emptyList()); } + /** +* Tries to format the text as code. +* +* @return text element with applied formatting +*/ + public TextElement formatAsCode() { --- End diff -- alternatively we could add an explicit `Code` `InlineElement`. ---
[jira] [Commented] (FLINK-9792) Cannot add html tags in options description
[ https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546321#comment-16546321 ] ASF GitHub Bot commented on FLINK-9792: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r202967561 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; + + private static String escapeCharacters(String value) { + return value + .replaceAll("%s", TEMPORARY_PLACEHOLDER) --- End diff -- @zentol Could you have a last look on this block code, before I merge it? Didn't want to sneak it in. I found out there is an issue when we use '%' sign in the description. > Cannot add html tags in options description > --- > > Key: FLINK-9792 > URL: https://issues.apache.org/jira/browse/FLINK-9792 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.1, 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > Right now it is impossible to add any html tags in options description, > because all "<" and ">" are escaped. Therefore some links there do not work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r202967561 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; + + private static String escapeCharacters(String value) { + return value + .replaceAll("%s", TEMPORARY_PLACEHOLDER) --- End diff -- @zentol Could you have a last look on this block code, before I merge it? Didn't want to sneak it in. I found out there is an issue when we use '%' sign in the description. ---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202966706 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException} +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( +private val tableEnv: BatchTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { --- End diff -- in many places (for example all `addProperties` methods) you are ordering methods very weirdly. Rule of thumb should be pubic methods before private. Longer story: https://stackoverflow.com/a/1760877/8149051 Many times in this code review I had to jump up & down. ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546310#comment-16546310 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202964290 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSource(externalCatalogTable, javaMap, statistics) + + // check for a stream table source in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSource(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table source.") +} + +val sink: Option[TableSinkTable[T2]] = tableEnv match { + // check for a batch table sink in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSink(externalCatalogTable, javaMap, statistics) + + // check for a stream table sink in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSink(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") +} + +new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createBatchTableSource[T]( --- End diff -- I still do not like this lack of abstraction between batch and streaming in form of `createBatchTableSource`/`createStreamTableSource`. Instead of writing if/elses everywhere in our code there should be some common layer that handles such logic. Here half of the problem boils down to factories with methods createBatchTableSource and createStreamTableSource. > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546311#comment-16546311 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202961529 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() --- End diff -- make `ExternalCatalogTable` and `BatchTableDescriptor` true builders with final fields? > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546318#comment-16546318 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202966786 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.util + +import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_PROCTIME} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND} +import org.apache.flink.table.factories.StreamTableSinkFactory +import org.apache.flink.types.Row + +/** + * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment. + */ +class CsvAppendTableSinkFactory --- End diff -- This should be done in separate commit > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546316#comment-16546316 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202963074 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment --- End diff -- drop those comments, code is already self explanatory > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546315#comment-16546315 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202966706 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException} +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( +private val tableEnv: BatchTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { --- End diff -- in many places (for example all `addProperties` methods) you are ordering methods very weirdly. Rule of thumb should be pubic methods before private. Longer story: https://stackoverflow.com/a/1760877/8149051 Many times in this code review I had to jump up & down. > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202963074 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment --- End diff -- drop those comments, code is already self explanatory ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546314#comment-16546314 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202964952 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( +private val tableEnv: StreamTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor + with StreamableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + private var updateMode: Option[String] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.addProperties(properties) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) +updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSource(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSource = TableFactoryService + .find(classOf[StreamTableSourceFactory[_]], javaMap) + .createStreamTableSource(javaMap) +tableEnv.registerTableSource(name, tableSource) + } + + /** +* Searches for the specified table sink, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSink(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSink = TableFactoryService + .find(classOf[StreamTableSinkFactory[_]], javaMap) + .createStreamTableSink(javaMap) +tableEnv.registerTableSink(name, tableSink) + } + + /** +* Searches for the specified table source and sink, configures them accordingly, and registers +* them as a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSourceAndSink(name: String): Unit = { +registerTableSource(name) +registerTableSink(name) + } + + /** +* Specifies the format that defines how to read data from a connector. +*/ + override def withFormat(format: FormatDescriptor): StreamTableDescriptor = { +formatDescriptor = Some(format) +this + } + + /** +* Specifies the resulting table schema. +*/ + override def withSchema(schema: Schema): StreamTableDescriptor = { +schemaDescriptor = Some(schema) +this + } + + /** +* Declares how to perform the conversion between a dynamic table and an external connector. +*
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546307#comment-16546307 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202923853 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() + * }}} + * + * Note: For backwards-compatibility, the table is declared as a table source for batch and + * streaming environment by default. + * + * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how + * to target suitable factories. + * + * @param connectorDescriptor describes the system to connect to */ -class ExternalCatalogTable( -connectorDesc: ConnectorDescriptor, -formatDesc: Option[FormatDescriptor], -schemaDesc: Option[Schema], -statisticsDesc: Option[Statistics], -metadataDesc: Option[Metadata]) - extends TableSourceDescriptor { - - this.connectorDescriptor = Some(connectorDesc) - this.formatDescriptor = formatDesc - this.schemaDescriptor = schemaDesc - this.statisticsDescriptor = statisticsDesc - this.metaDescriptor = metadataDesc - - // expose statistics for external table source util - override def getTableStats: Option[TableStats] = super.getTableStats +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor --- End diff -- single tab? this is inconsistent with other places > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202923853 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() + * }}} + * + * Note: For backwards-compatibility, the table is declared as a table source for batch and + * streaming environment by default. + * + * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how + * to target suitable factories. + * + * @param connectorDescriptor describes the system to connect to */ -class ExternalCatalogTable( -connectorDesc: ConnectorDescriptor, -formatDesc: Option[FormatDescriptor], -schemaDesc: Option[Schema], -statisticsDesc: Option[Statistics], -metadataDesc: Option[Metadata]) - extends TableSourceDescriptor { - - this.connectorDescriptor = Some(connectorDesc) - this.formatDescriptor = formatDesc - this.schemaDescriptor = schemaDesc - this.statisticsDescriptor = statisticsDesc - this.metaDescriptor = metadataDesc - - // expose statistics for external table source util - override def getTableStats: Option[TableStats] = super.getTableStats +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor --- End diff -- single tab? this is inconsistent with other places ---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202915534 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java --- @@ -40,10 +41,18 @@ @Test public void testMerging() throws Exception { - final Environment env1 = EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE); + final Map replaceVars1 = new HashMap<>(); + replaceVars1.put("$VAR_UPDATE_MODE", "update-mode: append"); + final Environment env1 = EnvironmentFileUtil.parseModified( + DEFAULTS_ENVIRONMENT_FILE, + replaceVars1); + + final Map replaceVars2 = new HashMap<>(); --- End diff -- ``` final Map replaceVars2 = new HashMap<>(replaceVars1); ``` and you can drop the line below ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546312#comment-16546312 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202965060 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException} +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( +private val tableEnv: BatchTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.addProperties(properties) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSource(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSource = TableFactoryService + .find(classOf[BatchTableSourceFactory[_]], javaMap) + .createBatchTableSource(javaMap) +tableEnv.registerTableSource(name, tableSource) + } + + /** +* Searches for the specified table sink, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSink(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSink = TableFactoryService + .find(classOf[BatchTableSinkFactory[_]], javaMap) + .createBatchTableSink(javaMap) +tableEnv.registerTableSink(name, tableSink) + } + + /** +* Searches for the specified table source and sink, configures them accordingly, and registers +* them as a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSourceAndSink(name: String): Unit = { +registerTableSource(name) +registerTableSink(name) + } + + /** +* Specifies the format that defines how to read data from a connector. +*/ + override def withFormat(format: FormatDescriptor): BatchTableDescriptor = { +formatDescriptor = Some(format) +this + } + + /** +* Specifies the resulting table schema. +*/ + override def withSchema(schema: Schema): BatchTableDescriptor = { +schemaDescriptor = Some(schema) +this + } + + override def toString: String = { +getValidProperties.toString + } + + // -- + + private def getValidProperties: DescriptorProperties = { --- End diff -- duplicated code with `StreamTableDescriptor. getValidProperties` > Expose descriptor-b
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202964715 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala --- @@ -19,14 +19,17 @@ package org.apache.flink.table.descriptors /** - * Common class for all descriptors describing a table sink. + * A trait for descriptors that allow to define a format and schema. */ -abstract class TableSinkDescriptor extends TableDescriptor { +trait SchematicDescriptor extends Descriptor { --- End diff -- are you using it anywhere as interface? what does extracting it to separate interface give us? Maybe drop it? ---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202964290 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSource(externalCatalogTable, javaMap, statistics) + + // check for a stream table source in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSource(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table source.") +} + +val sink: Option[TableSinkTable[T2]] = tableEnv match { + // check for a batch table sink in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSink(externalCatalogTable, javaMap, statistics) + + // check for a stream table sink in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSink(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") +} + +new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createBatchTableSource[T]( --- End diff -- I still do not like this lack of abstraction between batch and streaming in form of `createBatchTableSource`/`createStreamTableSource`. Instead of writing if/elses everywhere in our code there should be some common layer that handles such logic. Here half of the problem boils down to factories with methods createBatchTableSource and createStreamTableSource. ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546313#comment-16546313 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202965391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( --- End diff -- This class duplicates code with `BatchTableDescriptor` > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546309#comment-16546309 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202963234 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSource(externalCatalogTable, javaMap, statistics) + + // check for a stream table source in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSource(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table source.") +} + +val sink: Option[TableSinkTable[T2]] = tableEnv match { + // check for a batch table sink in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSink(externalCatalogTable, javaMap, statistics) + + // check for a stream table sink in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSink(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") +} + +new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createBatchTableSource[T]( + externalCatalogTable: ExternalCatalogTable, + javaMap: util.Map[String, String], + statistics: FlinkStatistic) +: Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) { --- End diff -- reverse if/else branches - simpler case should be first also if you change it to ``` if (!externalCatalogTable.isTableSource) { return None } val source = TableFactoryService .find(classOf[BatchTableSourceFactory[T]], javaMap) .createBatchTableSource(javaMap) val table = new BatchTableSourceTable( source,
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546317#comment-16546317 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202964715 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala --- @@ -19,14 +19,17 @@ package org.apache.flink.table.descriptors /** - * Common class for all descriptors describing a table sink. + * A trait for descriptors that allow to define a format and schema. */ -abstract class TableSinkDescriptor extends TableDescriptor { +trait SchematicDescriptor extends Descriptor { --- End diff -- are you using it anywhere as interface? what does extracting it to separate interface give us? Maybe drop it? > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202966786 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.util + +import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_PROCTIME} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND} +import org.apache.flink.table.factories.StreamTableSinkFactory +import org.apache.flink.types.Row + +/** + * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment. + */ +class CsvAppendTableSinkFactory --- End diff -- This should be done in separate commit ---
[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments
[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546308#comment-16546308 ] ASF GitHub Bot commented on FLINK-9852: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202915534 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java --- @@ -40,10 +41,18 @@ @Test public void testMerging() throws Exception { - final Environment env1 = EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE); + final Map replaceVars1 = new HashMap<>(); + replaceVars1.put("$VAR_UPDATE_MODE", "update-mode: append"); + final Environment env1 = EnvironmentFileUtil.parseModified( + DEFAULTS_ENVIRONMENT_FILE, + replaceVars1); + + final Map replaceVars2 = new HashMap<>(); --- End diff -- ``` final Map replaceVars2 = new HashMap<>(replaceVars1); ``` and you can drop the line below > Expose descriptor-based sink creation in table environments > --- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202965391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( --- End diff -- This class duplicates code with `BatchTableDescriptor` ---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202964952 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( +private val tableEnv: StreamTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor + with StreamableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + private var updateMode: Option[String] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.addProperties(properties) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) +updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSource(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSource = TableFactoryService + .find(classOf[StreamTableSourceFactory[_]], javaMap) + .createStreamTableSource(javaMap) +tableEnv.registerTableSource(name, tableSource) + } + + /** +* Searches for the specified table sink, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSink(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSink = TableFactoryService + .find(classOf[StreamTableSinkFactory[_]], javaMap) + .createStreamTableSink(javaMap) +tableEnv.registerTableSink(name, tableSink) + } + + /** +* Searches for the specified table source and sink, configures them accordingly, and registers +* them as a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSourceAndSink(name: String): Unit = { +registerTableSource(name) +registerTableSink(name) + } + + /** +* Specifies the format that defines how to read data from a connector. +*/ + override def withFormat(format: FormatDescriptor): StreamTableDescriptor = { +formatDescriptor = Some(format) +this + } + + /** +* Specifies the resulting table schema. +*/ + override def withSchema(schema: Schema): StreamTableDescriptor = { +schemaDescriptor = Some(schema) +this + } + + /** +* Declares how to perform the conversion between a dynamic table and an external connector. +* +* In append mode, a dynamic table and an external connector only exchange INSERT messages. +* +* @see See also [[inRetractMode()]] and [[inUpsertMode()]]. +*/ + override def inAppendMode(): StreamTableDescriptor = { +
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202965060 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException} +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService} + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( +private val tableEnv: BatchTableEnvironment, +private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor + with RegistrableDescriptor { + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.addProperties(properties) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSource(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSource = TableFactoryService + .find(classOf[BatchTableSourceFactory[_]], javaMap) + .createBatchTableSource(javaMap) +tableEnv.registerTableSource(name, tableSource) + } + + /** +* Searches for the specified table sink, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSink(name: String): Unit = { +val javaMap = getValidProperties.asMap +val tableSink = TableFactoryService + .find(classOf[BatchTableSinkFactory[_]], javaMap) + .createBatchTableSink(javaMap) +tableEnv.registerTableSink(name, tableSink) + } + + /** +* Searches for the specified table source and sink, configures them accordingly, and registers +* them as a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + override def registerTableSourceAndSink(name: String): Unit = { +registerTableSource(name) +registerTableSink(name) + } + + /** +* Specifies the format that defines how to read data from a connector. +*/ + override def withFormat(format: FormatDescriptor): BatchTableDescriptor = { +formatDescriptor = Some(format) +this + } + + /** +* Specifies the resulting table schema. +*/ + override def withSchema(schema: Schema): BatchTableDescriptor = { +schemaDescriptor = Some(schema) +this + } + + override def toString: String = { +getValidProperties.toString + } + + // -- + + private def getValidProperties: DescriptorProperties = { --- End diff -- duplicated code with `StreamTableDescriptor. getValidProperties` ---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202963234 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceSinkTable[T1, T2] = { + +val properties = new DescriptorProperties() +externalCatalogTable.addProperties(properties) +val javaMap = properties.asMap +val statistics = new FlinkStatistic(externalCatalogTable.getTableStats) + +val source: Option[TableSourceTable[T1]] = tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSource(externalCatalogTable, javaMap, statistics) + + // check for a stream table source in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSource(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table source.") +} + +val sink: Option[TableSinkTable[T2]] = tableEnv match { + // check for a batch table sink in this batch environment + case _: BatchTableEnvironment if externalCatalogTable.isBatchTable => +createBatchTableSink(externalCatalogTable, javaMap, statistics) + + // check for a stream table sink in this stream environment + case _: StreamTableEnvironment if externalCatalogTable.isStreamTable => +createStreamTableSink(externalCatalogTable, javaMap, statistics) + + case _ => +throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") +} + +new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createBatchTableSource[T]( + externalCatalogTable: ExternalCatalogTable, + javaMap: util.Map[String, String], + statistics: FlinkStatistic) +: Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) { --- End diff -- reverse if/else branches - simpler case should be first also if you change it to ``` if (!externalCatalogTable.isTableSource) { return None } val source = TableFactoryService .find(classOf[BatchTableSourceFactory[T]], javaMap) .createBatchTableSource(javaMap) val table = new BatchTableSourceTable( source, statistics) Some(table) ``` it would even further simplify the code (reader wouldn't have to track one extra level of nesting) ditto in other places ---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202961529 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() --- End diff -- make `ExternalCatalogTable` and `BatchTableDescriptor` true builders with final fields? ---
[jira] [Updated] (FLINK-9862) Update end-to-end test to use RocksDB backed timers
[ https://issues.apache.org/jira/browse/FLINK-9862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9862: -- Labels: pull-request-available (was: ) > Update end-to-end test to use RocksDB backed timers > --- > > Key: FLINK-9862 > URL: https://issues.apache.org/jira/browse/FLINK-9862 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0 > > > We should add or modify an end-to-end test to use RocksDB backed timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6294: [FLINK-9013][docs] Document yarn.containers.vcores only b...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6294 I've rebased it on top of rich formatting feature for documentation, therefore only the last commit applies to the issue. I will also check the `DominantResourceCalculator` once again. ---
[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config
[ https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546302#comment-16546302 ] ASF GitHub Bot commented on FLINK-9013: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6294 I've rebased it on top of rich formatting feature for documentation, therefore only the last commit applies to the issue. I will also check the `DominantResourceCalculator` once again. > Document yarn.containers.vcores only being effective when adapting YARN config > -- > > Key: FLINK-9013 > URL: https://issues.apache.org/jira/browse/FLINK-9013 > Project: Flink > Issue Type: Improvement > Components: Documentation, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > Even after specifying {{yarn.containers.vcores}} and having Flink request > such a container from YARN, it may not take these into account at all and > return a container with 1 vcore. > The YARN configuration needs to be adapted to take the vcores into account, > e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}: > {code} > > yarn.resourcemanager.scheduler.class > > org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler > > {code} > This fact should be documented at least at the configuration parameter > documentation of {{yarn.containers.vcores}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9862) Update end-to-end test to use RocksDB backed timers
[ https://issues.apache.org/jira/browse/FLINK-9862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546301#comment-16546301 ] ASF GitHub Bot commented on FLINK-9862: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6351 [FLINK-9862] [test] Extend general puropose DataStream test to have a tumbling window ## What is the purpose of the change This allows our end-to-end tests to have coverage for snapshotting / restoring timers, when configured to use different state backends. ## Brief change log - Add a tumbling window to the `DataStreamAllAroundTestProgram` - Change default "max out of orderness" setting of the source generator to 0 ## Verifying this change This is a change that affects existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9862 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6351.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6351 commit a3bc102303481fa784b9c94d7838b1c2b5f65123 Author: Tzu-Li (Gordon) Tai Date: 2018-07-17T10:00:40Z [FLINK-9862] [test] Extend general puropose DataStream test to have a tumbling window This allows the end-to-end tests to have coverage for testing checkpointing timers. > Update end-to-end test to use RocksDB backed timers > --- > > Key: FLINK-9862 > URL: https://issues.apache.org/jira/browse/FLINK-9862 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0 > > > We should add or modify an end-to-end test to use RocksDB backed timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis
[ https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-8163: - Assignee: Chesnay Schepler (was: Kostas Kloudas) just got a timeout locally > NonHAQueryableStateFsBackendITCase test getting stuck on Travis > --- > > Key: FLINK-8163 > URL: https://issues.apache.org/jira/browse/FLINK-8163 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis > producing no output for 300s. > https://travis-ci.org/tillrohrmann/flink/jobs/307988209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6351 [FLINK-9862] [test] Extend general puropose DataStream test to have a tumbling window ## What is the purpose of the change This allows our end-to-end tests to have coverage for snapshotting / restoring timers, when configured to use different state backends. ## Brief change log - Add a tumbling window to the `DataStreamAllAroundTestProgram` - Change default "max out of orderness" setting of the source generator to 0 ## Verifying this change This is a change that affects existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9862 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6351.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6351 commit a3bc102303481fa784b9c94d7838b1c2b5f65123 Author: Tzu-Li (Gordon) Tai Date: 2018-07-17T10:00:40Z [FLINK-9862] [test] Extend general puropose DataStream test to have a tumbling window This allows the end-to-end tests to have coverage for testing checkpointing timers. ---
[jira] [Closed] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis
[ https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8163. --- Resolution: Cannot Reproduce No occurrence was reported since the linked PR was merged. > NonHAQueryableStateFsBackendITCase test getting stuck on Travis > --- > > Key: FLINK-8163 > URL: https://issues.apache.org/jira/browse/FLINK-8163 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: test-stability > > The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis > producing no output for 300s. > https://travis-ci.org/tillrohrmann/flink/jobs/307988209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis
[ https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8163: Fix Version/s: (was: 1.5.2) (was: 1.6.0) > NonHAQueryableStateFsBackendITCase test getting stuck on Travis > --- > > Key: FLINK-8163 > URL: https://issues.apache.org/jira/browse/FLINK-8163 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: test-stability > > The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis > producing no output for 300s. > https://travis-ci.org/tillrohrmann/flink/jobs/307988209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6350: [FLINK-9873][runtime] Log task state when aborting...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6350#discussion_r202962522 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -457,13 +457,20 @@ public CheckpointTriggerResult triggerCheckpoint( Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); - if (ee != null && ee.getState() == ExecutionState.RUNNING) { - executions[i] = ee; - } else { + if (ee == null) { --- End diff -- according to the `ExecutionVertex` docs this branch shouldn't be necessary at all, but i kept it in to be safe. ---
[jira] [Updated] (FLINK-9872) SavepointITCase#testSavepointForJobWithIteration does not properly cancel jobs
[ https://issues.apache.org/jira/browse/FLINK-9872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9872: Summary: SavepointITCase#testSavepointForJobWithIteration does not properly cancel jobs (was: SavepointITCase#testSavepointForJobWithIteration does not properly cancell jobs) > SavepointITCase#testSavepointForJobWithIteration does not properly cancel jobs > -- > > Key: FLINK-9872 > URL: https://issues.apache.org/jira/browse/FLINK-9872 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.1, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.5.2, 1.6.0 > > > The {{SavepointITCase}} attempts to cancel a job by calling {{cancel}} on a > source instance. However this instance isn't actually executed on the > cluster, since it is serialized during the submission process. > > Additionally we aren't waiting for the cancellation to finish, causing the > test to log several exceptions when the cluster is shutdown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9873) Log actual state when aborting checkpoint due to task not running
[ https://issues.apache.org/jira/browse/FLINK-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546290#comment-16546290 ] ASF GitHub Bot commented on FLINK-9873: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6350#discussion_r202962522 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -457,13 +457,20 @@ public CheckpointTriggerResult triggerCheckpoint( Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); - if (ee != null && ee.getState() == ExecutionState.RUNNING) { - executions[i] = ee; - } else { + if (ee == null) { --- End diff -- according to the `ExecutionVertex` docs this branch shouldn't be necessary at all, but i kept it in to be safe. > Log actual state when aborting checkpoint due to task not running > - > > Key: FLINK-9873 > URL: https://issues.apache.org/jira/browse/FLINK-9873 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.1, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > Currently, if a checkpoint is triggered while a task s not in a RUNNING state > the following message is logged: > {code:java} > Checkpoint triggering task {} of job {} is not being executed at the > moment.{code} > We can improve this message to include the actual task state to help diagnose > problems. > This message is also a bit ambiguous, as "being executed" could mean many > things, from not "RUNNING", to not being "DEPLOYED", or to not existing at > all. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9873) Log actual state when aborting checkpoint due to task not running
[ https://issues.apache.org/jira/browse/FLINK-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546287#comment-16546287 ] ASF GitHub Bot commented on FLINK-9873: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6350 [FLINK-9873][runtime] Log task state when aborting checkpoint ## What is the purpose of the change This PR adjusts the logging message for when a checkpoint is declined due to tasks not being ready. We now explicitly log the current task state. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9873 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6350.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6350 commit 4dfe622fc45a2233dbba58640d6aa67be4739f86 Author: zentol Date: 2018-07-17T07:34:45Z [FLINK-9873][runtime] Log task state when aborting checkpoint > Log actual state when aborting checkpoint due to task not running > - > > Key: FLINK-9873 > URL: https://issues.apache.org/jira/browse/FLINK-9873 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.1, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > Currently, if a checkpoint is triggered while a task s not in a RUNNING state > the following message is logged: > {code:java} > Checkpoint triggering task {} of job {} is not being executed at the > moment.{code} > We can improve this message to include the actual task state to help diagnose > problems. > This message is also a bit ambiguous, as "being executed" could mean many > things, from not "RUNNING", to not being "DEPLOYED", or to not existing at > all. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9873) Log actual state when aborting checkpoint due to task not running
[ https://issues.apache.org/jira/browse/FLINK-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9873: -- Labels: pull-request-available (was: ) > Log actual state when aborting checkpoint due to task not running > - > > Key: FLINK-9873 > URL: https://issues.apache.org/jira/browse/FLINK-9873 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.1, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > Currently, if a checkpoint is triggered while a task s not in a RUNNING state > the following message is logged: > {code:java} > Checkpoint triggering task {} of job {} is not being executed at the > moment.{code} > We can improve this message to include the actual task state to help diagnose > problems. > This message is also a bit ambiguous, as "being executed" could mean many > things, from not "RUNNING", to not being "DEPLOYED", or to not existing at > all. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6350: [FLINK-9873][runtime] Log task state when aborting...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6350 [FLINK-9873][runtime] Log task state when aborting checkpoint ## What is the purpose of the change This PR adjusts the logging message for when a checkpoint is declined due to tasks not being ready. We now explicitly log the current task state. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9873 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6350.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6350 commit 4dfe622fc45a2233dbba58640d6aa67be4739f86 Author: zentol Date: 2018-07-17T07:34:45Z [FLINK-9873][runtime] Log task state when aborting checkpoint ---
[jira] [Created] (FLINK-9873) Log actual state when aborting checkpoint due to task not running
Chesnay Schepler created FLINK-9873: --- Summary: Log actual state when aborting checkpoint due to task not running Key: FLINK-9873 URL: https://issues.apache.org/jira/browse/FLINK-9873 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.5.1, 1.6.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.5.2, 1.6.0 Currently, if a checkpoint is triggered while a task s not in a RUNNING state the following message is logged: {code:java} Checkpoint triggering task {} of job {} is not being executed at the moment.{code} We can improve this message to include the actual task state to help diagnose problems. This message is also a bit ambiguous, as "being executed" could mean many things, from not "RUNNING", to not being "DEPLOYED", or to not existing at all. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6997) SavepointITCase fails in master branch sometimes
[ https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546283#comment-16546283 ] ASF GitHub Bot commented on FLINK-6997: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6349 [FLINK-6997][tests] Properly cancel test job ## What is the purpose of the change With this PR the jobs started in `SavepointITCase#testSavepointForJobWithIteration` are properly canceled. Previously they remained in a running state until the cluster was shut down, causing several exceptions to be logged. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9872 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6349.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6349 commit 434d52a4efaa31da59b04ede010b6a7757ebbcbc Author: zentol Date: 2018-07-17T09:34:29Z [FLINK-6997][tests] Properly cancel test job > SavepointITCase fails in master branch sometimes > > > Key: FLINK-6997 > URL: https://issues.apache.org/jira/browse/FLINK-6997 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0, 1.5.0 >Reporter: Ted Yu >Priority: Critical > Labels: test-stability > > I got the following test failure (with commit > a0b781461bcf8c2f1d00b93464995f03eda589f1) > {code} > testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase) > Time elapsed: 8.129 sec <<< ERROR! > java.io.IOException: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342) > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316) > at > org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827) > Caused by: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805) > at > org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) > at akka.dispatch.OnComplete.internal(Future.scala:247) > at akka.dispatch.OnComplete.internal(Future.scala:245) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required > tasks are currently running. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderS
[GitHub] flink pull request #6349: [FLINK-6997][tests] Properly cancel test job
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6349 [FLINK-6997][tests] Properly cancel test job ## What is the purpose of the change With this PR the jobs started in `SavepointITCase#testSavepointForJobWithIteration` are properly canceled. Previously they remained in a running state until the cluster was shut down, causing several exceptions to be logged. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9872 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6349.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6349 commit 434d52a4efaa31da59b04ede010b6a7757ebbcbc Author: zentol Date: 2018-07-17T09:34:29Z [FLINK-6997][tests] Properly cancel test job ---
[jira] [Created] (FLINK-9872) SavepointITCase#testSavepointForJobWithIteration does not properly cancell jobs
Chesnay Schepler created FLINK-9872: --- Summary: SavepointITCase#testSavepointForJobWithIteration does not properly cancell jobs Key: FLINK-9872 URL: https://issues.apache.org/jira/browse/FLINK-9872 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.5.1, 1.6.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.5.2, 1.6.0 The {{SavepointITCase}} attempts to cancel a job by calling {{cancel}} on a source instance. However this instance isn't actually executed on the cluster, since it is serialized during the submission process. Additionally we aren't waiting for the cancellation to finish, causing the test to log several exceptions when the cluster is shutdown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6997) SavepointITCase fails in master branch sometimes
[ https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-6997: Fix Version/s: (was: 1.5.2) (was: 1.6.0) > SavepointITCase fails in master branch sometimes > > > Key: FLINK-6997 > URL: https://issues.apache.org/jira/browse/FLINK-6997 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0, 1.5.0 >Reporter: Ted Yu >Priority: Critical > Labels: test-stability > > I got the following test failure (with commit > a0b781461bcf8c2f1d00b93464995f03eda589f1) > {code} > testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase) > Time elapsed: 8.129 sec <<< ERROR! > java.io.IOException: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342) > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316) > at > org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827) > Caused by: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805) > at > org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) > at akka.dispatch.OnComplete.internal(Future.scala:247) > at akka.dispatch.OnComplete.internal(Future.scala:245) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required > tasks are currently running. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-6997) SavepointITCase fails in master branch sometimes
[ https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-6997. --- Resolution: Cannot Reproduce > SavepointITCase fails in master branch sometimes > > > Key: FLINK-6997 > URL: https://issues.apache.org/jira/browse/FLINK-6997 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0, 1.5.0 >Reporter: Ted Yu >Priority: Critical > Labels: test-stability > > I got the following test failure (with commit > a0b781461bcf8c2f1d00b93464995f03eda589f1) > {code} > testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase) > Time elapsed: 8.129 sec <<< ERROR! > java.io.IOException: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342) > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316) > at > org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827) > Caused by: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805) > at > org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) > at akka.dispatch.OnComplete.internal(Future.scala:247) > at akka.dispatch.OnComplete.internal(Future.scala:245) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required > tasks are currently running. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546270#comment-16546270 ] Chesnay Schepler commented on FLINK-8101: - [~sharju] no, this Jira will not be fixed for 1.6. > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Priority: Major > Fix For: 1.7.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8101: Fix Version/s: (was: 1.6.0) 1.7.0 > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Priority: Major > Fix For: 1.7.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6347 `vertices` is the correct plural, but this is another one of those cases where fixing it might cause more harm than good since it could cause merge conflicts, yet provides no functional benefit. Additionally this PR makes a lot of whitespace changes that should be reverted in any case. ---
[jira] [Commented] (FLINK-6997) SavepointITCase fails in master branch sometimes
[ https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546262#comment-16546262 ] Chesnay Schepler commented on FLINK-6997: - Either we're dealing with 2 separate issues here or it's not a timing issue. The original exception is thrown if any task is not in a RUNNING state. The exception that Till got shows that the job failed while the checkpoint was underway. Both cases could be explained by a failure of the job, in the first case shortly before the savepoint is triggered, in the latter during the savepoint operation. I don't have an explanation for possible failures at this moment, but it could be virtually anything. In any case, this test got some issues. It attempts to cancel jobs by calling {{cancel}} on one the source instances, but there are obviously not the instances actually running on the cluster. Even if this worked we aren't waiting for the cancellation to happen and shutdown the cluster, resulting in a barrage of exceptions in the logs. Since the failure already occurs before the second job is even start these can't explain the test failures though. > SavepointITCase fails in master branch sometimes > > > Key: FLINK-6997 > URL: https://issues.apache.org/jira/browse/FLINK-6997 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0, 1.5.0 >Reporter: Ted Yu >Priority: Critical > Labels: test-stability > Fix For: 1.5.2, 1.6.0 > > > I got the following test failure (with commit > a0b781461bcf8c2f1d00b93464995f03eda589f1) > {code} > testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase) > Time elapsed: 8.129 sec <<< ERROR! > java.io.IOException: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342) > at > org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316) > at > org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827) > Caused by: java.lang.Exception: Failed to complete savepoint > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805) > at > org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) > at akka.dispatch.OnComplete.internal(Future.scala:247) > at akka.dispatch.OnComplete.internal(Future.scala:245) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required > tasks are currently running. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546246#comment-16546246 ] Sami Harju commented on FLINK-8101: --- Hi, any updates to this? Are we getting ES 6 support in 1.6? > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Priority: Major > Fix For: 1.6.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9871) Use Description class for ConfigOptions with rich formatting
Dawid Wysakowicz created FLINK-9871: --- Summary: Use Description class for ConfigOptions with rich formatting Key: FLINK-9871 URL: https://issues.apache.org/jira/browse/FLINK-9871 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.6.0 Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Fix For: 1.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9669) Introduce task manager assignment store
[ https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-9669: - Reopen to remove fixVersion > Introduce task manager assignment store > --- > > Key: FLINK-9669 > URL: https://issues.apache.org/jira/browse/FLINK-9669 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)