[GitHub] flink pull request #6352: [FLINK-8163][yarn][tests] Harden tests against slo...

2018-07-17 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6352

[FLINK-8163][yarn][tests] Harden tests against slow job shutdowns

## What is the purpose of the change

This PR hardens the `YarnTestBase` against jobs that just don't want to 
shut down that quickly (i.e. within 500ms).
The maximum waiting time has been increase to 10 seconds, during which we 
periodically check the state of all applications.

Additionally, the failure condition from `@Before` was moved to the 
`@After` method.

This change will allow us to better differentiate between simple timing 
issues and unsuccessful job shutdowns.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8163

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6352.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6352


commit 0dd65378f0c9f477bb8f5712bbc0b1f31440f5f0
Author: zentol 
Date:   2018-07-17T11:29:16Z

[FLINK-8163][yarn][tests] Harden tests against slow job shutdowns




---


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202989785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, 
ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+  */
+class StreamTableDescriptor(
--- End diff --

I don't see an alternative if we don't want to have an ugly API.


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546466#comment-16546466
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202989785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, 
ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+  */
+class StreamTableDescriptor(
--- End diff --

I don't see an alternative if we don't want to have an ugly API.


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546453#comment-16546453
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202986793
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, 
ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch 
environment.
+  */
+class BatchTableDescriptor(
+private val tableEnv: BatchTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Depends on the language you are using. This method is public in Java ;)
But I will move it down.


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202986793
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, 
ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch 
environment.
+  */
+class BatchTableDescriptor(
+private val tableEnv: BatchTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Depends on the language you are using. This method is public in Java ;)
But I will move it down.


---


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546448#comment-16546448
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6300
  
Nice feature, thanks a lot.

Merged this into the 1.6 and 1.7 branches


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
> Fix For: 1.6.0
>
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-17 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-9692.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in
  - 1.6.0 via 8005a2ebac1afbec6fcf43a4442f51f442f33590
  - 1.7.0 via 01378d06215d6e00b78c4ed40b4b8a2c5c9db129

> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
> Fix For: 1.6.0
>
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-17 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-9692.
---

> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
> Fix For: 1.6.0
>
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6300
  
Nice feature, thanks a lot.

Merged this into the 1.6 and 1.7 branches


---


[jira] [Commented] (FLINK-9815) YARNSessionCapacitySchedulerITCase flaky

2018-07-17 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546440#comment-16546440
 ] 

Chesnay Schepler commented on FLINK-9815:
-

The {{YarnTestBase}} has an {{@After}} method in which it sleeps for 500ms to 
wait for jobs to terminate. I'll replace this with a more lenient and proper 
check that fails if any application is still running after several seconds.

> YARNSessionCapacitySchedulerITCase flaky
> 
>
> Key: FLINK-9815
> URL: https://issues.apache.org/jira/browse/FLINK-9815
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.6.0
>
>
> The test fails because of dangling yarn applications.
> Logs: https://api.travis-ci.org/v3/job/402657694/log.txt
> It was also reported previously in [FLINK-8161] : 
> https://issues.apache.org/jira/browse/FLINK-8161?focusedCommentId=16480216&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16480216



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546435#comment-16546435
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202983790
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, 
ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch 
environment.
+  */
+class BatchTableDescriptor(
+private val tableEnv: BatchTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.addProperties(properties)
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSource(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSource = TableFactoryService
+  .find(classOf[BatchTableSourceFactory[_]], javaMap)
+  .createBatchTableSource(javaMap)
+tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+* Searches for the specified table sink, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSink(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSink = TableFactoryService
+  .find(classOf[BatchTableSinkFactory[_]], javaMap)
+  .createBatchTableSink(javaMap)
+tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+* Searches for the specified table source and sink, configures them 
accordingly, and registers
+* them as a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSourceAndSink(name: String): Unit = {
+registerTableSource(name)
+registerTableSink(name)
+  }
+
+  /**
+* Specifies the format that defines how to read data from a connector.
+*/
+  override def withFormat(format: FormatDescriptor): BatchTableDescriptor 
= {
+formatDescriptor = Some(format)
+this
+  }
+
+  /**
+* Specifies the resulting table schema.
+*/
+  override def withSchema(schema: Schema): BatchTableDescriptor = {
+schemaDescriptor = Some(schema)
+this
+  }
+
+  override def toString: String = {
+getValidProperties.toString
+  }
+
+  // 
--
+
+  private def getValidProperties: DescriptorProperties = {
--- End diff --

I agree but I cannot move it up the class hierarchy because it would be 
public in Java. I

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202983790
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, 
ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch 
environment.
+  */
+class BatchTableDescriptor(
+private val tableEnv: BatchTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.addProperties(properties)
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSource(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSource = TableFactoryService
+  .find(classOf[BatchTableSourceFactory[_]], javaMap)
+  .createBatchTableSource(javaMap)
+tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+* Searches for the specified table sink, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSink(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSink = TableFactoryService
+  .find(classOf[BatchTableSinkFactory[_]], javaMap)
+  .createBatchTableSink(javaMap)
+tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+* Searches for the specified table source and sink, configures them 
accordingly, and registers
+* them as a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSourceAndSink(name: String): Unit = {
+registerTableSource(name)
+registerTableSink(name)
+  }
+
+  /**
+* Specifies the format that defines how to read data from a connector.
+*/
+  override def withFormat(format: FormatDescriptor): BatchTableDescriptor 
= {
+formatDescriptor = Some(format)
+this
+  }
+
+  /**
+* Specifies the resulting table schema.
+*/
+  override def withSchema(schema: Schema): BatchTableDescriptor = {
+schemaDescriptor = Some(schema)
+this
+  }
+
+  override def toString: String = {
+getValidProperties.toString
+  }
+
+  // 
--
+
+  private def getValidProperties: DescriptorProperties = {
--- End diff --

I agree but I cannot move it up the class hierarchy because it would be 
public in Java. I will create a util class.


---


[jira] [Commented] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis

2018-07-17 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546433#comment-16546433
 ] 

Chesnay Schepler commented on FLINK-8163:
-

I run into the memory issue consistently after ~100 iterations.

> NonHAQueryableStateFsBackendITCase test getting stuck on Travis
> ---
>
> Key: FLINK-8163
> URL: https://issues.apache.org/jira/browse/FLINK-8163
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis 
> producing no output for 300s.
> https://travis-ci.org/tillrohrmann/flink/jobs/307988209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546430#comment-16546430
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6300


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9777) YARN: JM and TM Memory must be specified with Units

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546425#comment-16546425
 ] 

ASF GitHub Bot commented on FLINK-9777:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6297
  
@dawidwys added test case, please review~


> YARN: JM and TM Memory must be specified with Units 
> 
>
> Key: FLINK-9777
> URL: https://issues.apache.org/jira/browse/FLINK-9777
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, YARN
>Affects Versions: 1.6.0
> Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> FLINK-6469 breaks backwards compatibility because the JobManager and 
> TaskManager memory must be specified with units (otherwise bytes are 
> assumed). The command to start a YARN session as documented 
> ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
>  
> |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
>  would not work because 1024 bytes and 4096 bytes are not enough for the heap 
> size. The command finishes with the following exception:
> {noformat}
> java.lang.reflect.UndeclaredThrowableException
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
> Couldn't deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   ... 2 more
> Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
> memory requirements with the provided cluster specification. Please increase 
> the memory of the cluster.
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413)
>   ... 7 more
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>   at 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450)
>   ... 9 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6300


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546424#comment-16546424
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202982848
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, 
ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+  */
+class StreamTableDescriptor(
+private val tableEnv: StreamTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor
+  with StreamableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+  private var updateMode: Option[String] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.addProperties(properties)
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSource(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSource = TableFactoryService
+  .find(classOf[StreamTableSourceFactory[_]], javaMap)
+  .createStreamTableSource(javaMap)
+tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+* Searches for the specified table sink, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSink(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSink = TableFactoryService
+  .find(classOf[StreamTableSinkFactory[_]], javaMap)
+  .createStreamTableSink(javaMap)
+tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+* Searches for the specified table source and sink, configures them 
accordingly, and registers
+* them as a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSourceAndSink(name: String): Unit = {
+registerTableSource(name)
+registerTableSink(name)
+  }
+
+  /**
+* Specifies the format that defines how to read data from a connector.
+*/
+  override def withFormat(format: FormatDescriptor): StreamTableDescriptor 
= {
+formatDescriptor = Some(format)
+this
+  }
+
+  /**
+* Specifies the resulting table schema.
+*/
+  override def withSchema(schema: Schema): StreamTableDescriptor = {
+schemaDescriptor = Some(schema)
+this
+  }
+
+  /**
+* Declares how to perform the conversion between a dynamic table and 
an external connector.
+*
+

[GitHub] flink pull request #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread tison1
Github user tison1 closed the pull request at:

https://github.com/apache/flink/pull/6347


---


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202982848
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, 
ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+  */
+class StreamTableDescriptor(
+private val tableEnv: StreamTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor
+  with StreamableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+  private var updateMode: Option[String] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.addProperties(properties)
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSource(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSource = TableFactoryService
+  .find(classOf[StreamTableSourceFactory[_]], javaMap)
+  .createStreamTableSource(javaMap)
+tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+* Searches for the specified table sink, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSink(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSink = TableFactoryService
+  .find(classOf[StreamTableSinkFactory[_]], javaMap)
+  .createStreamTableSink(javaMap)
+tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+* Searches for the specified table source and sink, configures them 
accordingly, and registers
+* them as a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSourceAndSink(name: String): Unit = {
+registerTableSource(name)
+registerTableSink(name)
+  }
+
+  /**
+* Specifies the format that defines how to read data from a connector.
+*/
+  override def withFormat(format: FormatDescriptor): StreamTableDescriptor 
= {
+formatDescriptor = Some(format)
+this
+  }
+
+  /**
+* Specifies the resulting table schema.
+*/
+  override def withSchema(schema: Schema): StreamTableDescriptor = {
+schemaDescriptor = Some(schema)
+this
+  }
+
+  /**
+* Declares how to perform the conversion between a dynamic table and 
an external connector.
+*
+* In append mode, a dynamic table and an external connector only 
exchange INSERT messages.
+*
+* @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+*/
+  override def inAppendMode(): StreamTableDescriptor = {
+  

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

2018-07-17 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6297
  
@dawidwys added test case, please review~


---


[GitHub] flink pull request #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread tison1
Github user tison1 closed the pull request at:

https://github.com/apache/flink/pull/6347


---


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread tison1
Github user tison1 commented on the issue:

https://github.com/apache/flink/pull/6347
  
> vertices is the correct plural, but this is another one of those cases 
where fixing it might cause more harm than good since it could cause merge 
conflicts, yet provides no functional benefit.

... close as @zentol suggested


---


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread tison1
Github user tison1 commented on the issue:

https://github.com/apache/flink/pull/6347
  
> vertices is the correct plural, but this is another one of those cases 
where fixing it might cause more harm than good since it could cause merge 
conflicts, yet provides no functional benefit.

... close as @zentol suggested


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546423#comment-16546423
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202982499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
 ---
@@ -19,14 +19,17 @@
 package org.apache.flink.table.descriptors
 
 /**
-  * Common class for all descriptors describing a table sink.
+  * A trait for descriptors that allow to define a format and schema.
   */
-abstract class TableSinkDescriptor extends TableDescriptor {
+trait SchematicDescriptor extends Descriptor {
--- End diff --

`SchematicDescriptor` is used for `ExternalCatalogTable`, 
`StreamTableDescriptor`, and `BatchTableDescriptor`. If we add a new level next 
to `connector`, `format` (which may happens in the future), we would 
immediately get a compile error there.


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202982499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
 ---
@@ -19,14 +19,17 @@
 package org.apache.flink.table.descriptors
 
 /**
-  * Common class for all descriptors describing a table sink.
+  * A trait for descriptors that allow to define a format and schema.
   */
-abstract class TableSinkDescriptor extends TableDescriptor {
+trait SchematicDescriptor extends Descriptor {
--- End diff --

`SchematicDescriptor` is used for `ExternalCatalogTable`, 
`StreamTableDescriptor`, and `BatchTableDescriptor`. If we add a new level next 
to `connector`, `format` (which may happens in the future), we would 
immediately get a compile error there.


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546419#comment-16546419
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202981771
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table source in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table source.")
+}
+
+val sink: Option[TableSinkTable[T2]] = tableEnv match {
+  // check for a batch table sink in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table sink in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table sink.")
+}
+
+new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createBatchTableSource[T](
--- End diff --

Then we would have 4 factories that have to be checked with if/else 
branches. Having those if/else at 3 places (SQL Client, external catalog, and 
descriptors) is acceptable in my opinion.


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo 

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202981771
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table source in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table source.")
+}
+
+val sink: Option[TableSinkTable[T2]] = tableEnv match {
+  // check for a batch table sink in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table sink in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table sink.")
+}
+
+new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createBatchTableSource[T](
--- End diff --

Then we would have 4 factories that have to be checked with if/else 
branches. Having those if/else at 3 places (SQL Client, external catalog, and 
descriptors) is acceptable in my opinion.


---


[jira] [Assigned] (FLINK-9815) YARNSessionCapacitySchedulerITCase flaky

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-9815:
---

Assignee: Chesnay Schepler

> YARNSessionCapacitySchedulerITCase flaky
> 
>
> Key: FLINK-9815
> URL: https://issues.apache.org/jira/browse/FLINK-9815
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.6.0
>
>
> The test fails because of dangling yarn applications.
> Logs: https://api.travis-ci.org/v3/job/402657694/log.txt
> It was also reported previously in [FLINK-8161] : 
> https://issues.apache.org/jira/browse/FLINK-8161?focusedCommentId=16480216&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16480216



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546411#comment-16546411
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202980228
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table source in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table source.")
+}
+
+val sink: Option[TableSinkTable[T2]] = tableEnv match {
+  // check for a batch table sink in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table sink in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table sink.")
+}
+
+new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createBatchTableSource[T](
+  externalCatalogTable: ExternalCatalogTable,
+  javaMap: util.Map[String, String],
+  statistics: FlinkStatistic)
+: Option[TableSourceTable[T]] = if 
(externalCatalogTable.isTableSource) {
--- End diff --

I also like this pattern more but it is not very Scala-like:
https://tpolecat.github.io/2014/05/09/return.html


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Proj

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202980228
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table source in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table source.")
+}
+
+val sink: Option[TableSinkTable[T2]] = tableEnv match {
+  // check for a batch table sink in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table sink in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table sink.")
+}
+
+new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createBatchTableSource[T](
+  externalCatalogTable: ExternalCatalogTable,
+  javaMap: util.Map[String, String],
+  statistics: FlinkStatistic)
+: Option[TableSourceTable[T]] = if 
(externalCatalogTable.isTableSource) {
--- End diff --

I also like this pattern more but it is not very Scala-like:
https://tpolecat.github.io/2014/05/09/return.html


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546407#comment-16546407
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202978881
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,33 +18,299 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * The catalog table takes descriptors which allow for declaring the 
communication to external
+  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
+  * that match the desired configuration.
+  *
+  * Use the provided builder methods to configure the external catalog 
table accordingly.
+  *
+  * The following example shows how to read from a connector using a JSON 
format and
+  * declaring it as a table source:
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * {{{
+  *   ExternalCatalogTable(
+  * new ExternalSystemXYZ()
+  *   .version("0.11"))
+  *   .withFormat(
+  * new Json()
+  *   .jsonSchema("{...}")
+  *   .failOnMissingField(false))
+  *   .withSchema(
+  * new Schema()
+  *   .field("user-name", "VARCHAR").from("u_name")
+  *   .field("count", "DECIMAL")
+  *   .asTableSource()
--- End diff --

I also thought about that but actually descriptors don't "build" something. 
The only final representation would be the properties but we don't expose them 
to the user.


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202978881
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,33 +18,299 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * The catalog table takes descriptors which allow for declaring the 
communication to external
+  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
+  * that match the desired configuration.
+  *
+  * Use the provided builder methods to configure the external catalog 
table accordingly.
+  *
+  * The following example shows how to read from a connector using a JSON 
format and
+  * declaring it as a table source:
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * {{{
+  *   ExternalCatalogTable(
+  * new ExternalSystemXYZ()
+  *   .version("0.11"))
+  *   .withFormat(
+  * new Json()
+  *   .jsonSchema("{...}")
+  *   .failOnMissingField(false))
+  *   .withSchema(
+  * new Schema()
+  *   .field("user-name", "VARCHAR").from("u_name")
+  *   .field("count", "DECIMAL")
+  *   .asTableSource()
--- End diff --

I also thought about that but actually descriptors don't "build" something. 
The only final representation would be the properties but we don't expose them 
to the user.


---


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6347
  
yes


---


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread tison1
Github user tison1 commented on the issue:

https://github.com/apache/flink/pull/6347
  
> Additionally this PR makes a lot of whitespace changes that should be 
reverted in any case.

did you mean the whitespace in comment `* ` is significant?


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546399#comment-16546399
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202977464
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,33 +18,299 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * The catalog table takes descriptors which allow for declaring the 
communication to external
+  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
+  * that match the desired configuration.
+  *
+  * Use the provided builder methods to configure the external catalog 
table accordingly.
+  *
+  * The following example shows how to read from a connector using a JSON 
format and
+  * declaring it as a table source:
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * {{{
+  *   ExternalCatalogTable(
+  * new ExternalSystemXYZ()
+  *   .version("0.11"))
+  *   .withFormat(
+  * new Json()
+  *   .jsonSchema("{...}")
+  *   .failOnMissingField(false))
+  *   .withSchema(
+  * new Schema()
+  *   .field("user-name", "VARCHAR").from("u_name")
+  *   .field("count", "DECIMAL")
+  *   .asTableSource()
+  * }}}
+  *
+  * Note: For backwards-compatibility, the table is declared as a table 
source for batch and
+  * streaming environment by default.
+  *
+  * See also [[org.apache.flink.table.factories.TableFactory]] for more 
information about how
+  * to target suitable factories.
+  *
+  * @param connectorDescriptor describes the system to connect to
   */
-class ExternalCatalogTable(
-connectorDesc: ConnectorDescriptor,
-formatDesc: Option[FormatDescriptor],
-schemaDesc: Option[Schema],
-statisticsDesc: Option[Statistics],
-metadataDesc: Option[Metadata])
-  extends TableSourceDescriptor {
-
-  this.connectorDescriptor = Some(connectorDesc)
-  this.formatDescriptor = formatDesc
-  this.schemaDescriptor = schemaDesc
-  this.statisticsDescriptor = statisticsDesc
-  this.metaDescriptor = metadataDesc
-
-  // expose statistics for external table source util
-  override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
--- End diff --

https://github.com/databricks/scala-style-guide#indent


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202977464
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,33 +18,299 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * The catalog table takes descriptors which allow for declaring the 
communication to external
+  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
+  * that match the desired configuration.
+  *
+  * Use the provided builder methods to configure the external catalog 
table accordingly.
+  *
+  * The following example shows how to read from a connector using a JSON 
format and
+  * declaring it as a table source:
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * {{{
+  *   ExternalCatalogTable(
+  * new ExternalSystemXYZ()
+  *   .version("0.11"))
+  *   .withFormat(
+  * new Json()
+  *   .jsonSchema("{...}")
+  *   .failOnMissingField(false))
+  *   .withSchema(
+  * new Schema()
+  *   .field("user-name", "VARCHAR").from("u_name")
+  *   .field("count", "DECIMAL")
+  *   .asTableSource()
+  * }}}
+  *
+  * Note: For backwards-compatibility, the table is declared as a table 
source for batch and
+  * streaming environment by default.
+  *
+  * See also [[org.apache.flink.table.factories.TableFactory]] for more 
information about how
+  * to target suitable factories.
+  *
+  * @param connectorDescriptor describes the system to connect to
   */
-class ExternalCatalogTable(
-connectorDesc: ConnectorDescriptor,
-formatDesc: Option[FormatDescriptor],
-schemaDesc: Option[Schema],
-statisticsDesc: Option[Statistics],
-metadataDesc: Option[Metadata])
-  extends TableSourceDescriptor {
-
-  this.connectorDescriptor = Some(connectorDesc)
-  this.formatDescriptor = formatDesc
-  this.schemaDescriptor = schemaDesc
-  this.statisticsDescriptor = statisticsDesc
-  this.metaDescriptor = metadataDesc
-
-  // expose statistics for external table source util
-  override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
--- End diff --

https://github.com/databricks/scala-style-guide#indent


---


[jira] [Commented] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis

2018-07-17 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546392#comment-16546392
 ] 

Chesnay Schepler commented on FLINK-8163:
-

This test has some funky retrying logic that ignores most exceptions:
{code:java}
CompletableFuture expected = client.getKvState(jobId, queryName, key, 
keyTypeInfo, stateDescriptor);
expected.whenCompleteAsync((result, throwable) -> {
   if (throwable != null) {
  if (
throwable.getCause() instanceof CancellationException ||
throwable.getCause() instanceof AssertionError ||
(failForUnknownKeyOrNamespace && throwable.getCause() instanceof 
UnknownKeyOrNamespaceException)
  ) {
 resultFuture.completeExceptionally(throwable.getCause());
  } else if (deadline.hasTimeLeft()) {
 getKvStateIgnoringCertainExceptions(
   deadline, resultFuture, client, jobId, queryName, key, 
keyTypeInfo,
   stateDescriptor, failForUnknownKeyOrNamespace, executor);
  }
   } else {
  resultFuture.complete(result);
   }
}, executor);{code}

When running the test locally in a loop this exception was logged on the server.
{code}
org.apache.flink.shaded.netty4.io.netty.util.internal.OutOfDirectMemoryError: 
failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 
2147483648)
{code}
The client ignores this error, infinitely retries the operation, causing the 
timeout. Incidentally, on every subsequent attempt the same exception is 
printed.

> NonHAQueryableStateFsBackendITCase test getting stuck on Travis
> ---
>
> Key: FLINK-8163
> URL: https://issues.apache.org/jira/browse/FLINK-8163
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis 
> producing no output for 300s.
> https://travis-ci.org/tillrohrmann/flink/jobs/307988209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8163.
---
Resolution: Cannot Reproduce

This test has some funky retrying logic that ignores

> NonHAQueryableStateFsBackendITCase test getting stuck on Travis
> ---
>
> Key: FLINK-8163
> URL: https://issues.apache.org/jira/browse/FLINK-8163
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis 
> producing no output for 300s.
> https://travis-ci.org/tillrohrmann/flink/jobs/307988209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-8163:
-

> NonHAQueryableStateFsBackendITCase test getting stuck on Travis
> ---
>
> Key: FLINK-8163
> URL: https://issues.apache.org/jira/browse/FLINK-8163
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis 
> producing no output for 300s.
> https://travis-ci.org/tillrohrmann/flink/jobs/307988209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9792) Cannot add html tags in options description

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546369#comment-16546369
 ] 

ASF GitHub Bot commented on FLINK-9792:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202970121
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
+
+   private static String escapeCharacters(String value) {
+   return value
+   .replaceAll("%s", TEMPORARY_PLACEHOLDER)
--- End diff --

can you give me an example for where this is problematic? Does this occur 
if the _final formatted_ description contains `%`?


> Cannot add html tags in options description
> ---
>
> Key: FLINK-9792
> URL: https://issues.apache.org/jira/browse/FLINK-9792
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> Right now it is impossible to add any html tags in options description, 
> because all "<" and ">" are escaped. Therefore some links there do not work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9792) Cannot add html tags in options description

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546368#comment-16546368
 ] 

ASF GitHub Bot commented on FLINK-9792:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202969620
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
--- End diff --

this should be distinct from the placeholder in `Utils`.


> Cannot add html tags in options description
> ---
>
> Key: FLINK-9792
> URL: https://issues.apache.org/jira/browse/FLINK-9792
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> Right now it is impossible to add any html tags in options description, 
> because all "<" and ">" are escaped. Therefore some links there do not work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202970121
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
+
+   private static String escapeCharacters(String value) {
+   return value
+   .replaceAll("%s", TEMPORARY_PLACEHOLDER)
--- End diff --

can you give me an example for where this is problematic? Does this occur 
if the _final formatted_ description contains `%`?


---


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202969620
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
--- End diff --

this should be distinct from the placeholder in `Utils`.


---


[jira] [Updated] (FLINK-9873) Log actual state when aborting checkpoint due to task not running

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9873:

Affects Version/s: (was: 1.5.1)
   1.5.0

> Log actual state when aborting checkpoint due to task not running
> -
>
> Key: FLINK-9873
> URL: https://issues.apache.org/jira/browse/FLINK-9873
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> Currently, if a checkpoint is triggered while a task s not in a RUNNING state 
> the following message is logged:
> {code:java}
> Checkpoint triggering task {} of job {} is not being executed at the 
> moment.{code}
> We can improve this message to include the actual task state to help diagnose 
> problems.
> This message is also a bit ambiguous, as "being executed" could mean many 
> things, from not "RUNNING", to not being "DEPLOYED", or to not existing at 
> all.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9792) Cannot add html tags in options description

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9792:

Affects Version/s: (was: 1.5.1)
   1.5.0

> Cannot add html tags in options description
> ---
>
> Key: FLINK-9792
> URL: https://issues.apache.org/jira/browse/FLINK-9792
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> Right now it is impossible to add any html tags in options description, 
> because all "<" and ">" are escaped. Therefore some links there do not work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546325#comment-16546325
 ] 

ASF GitHub Bot commented on FLINK-9013:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202968456
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/TextElement.java
 ---
@@ -53,6 +55,16 @@ public static TextElement text(String text) {
return new TextElement(text, Collections.emptyList());
}
 
+   /**
+* Tries to format the text as code.
+*
+* @return text element with applied formatting
+*/
+   public TextElement formatAsCode() {
--- End diff --

alternatively we could add an explicit `Code` `InlineElement`.


> Document yarn.containers.vcores only being effective when adapting YARN config
> --
>
> Key: FLINK-9013
> URL: https://issues.apache.org/jira/browse/FLINK-9013
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> Even after specifying {{yarn.containers.vcores}} and having Flink request 
> such a container from YARN, it may not take these into account at all and 
> return a container with 1 vcore.
> The YARN configuration needs to be adapted to take the vcores into account, 
> e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}:
> {code}
> 
>   yarn.resourcemanager.scheduler.class
>   
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
> 
> {code}
> This fact should be documented at least at the configuration parameter 
> documentation of  {{yarn.containers.vcores}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202968456
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/TextElement.java
 ---
@@ -53,6 +55,16 @@ public static TextElement text(String text) {
return new TextElement(text, Collections.emptyList());
}
 
+   /**
+* Tries to format the text as code.
+*
+* @return text element with applied formatting
+*/
+   public TextElement formatAsCode() {
--- End diff --

alternatively we could add an explicit `Code` `InlineElement`.


---


[jira] [Commented] (FLINK-9792) Cannot add html tags in options description

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546321#comment-16546321
 ] 

ASF GitHub Bot commented on FLINK-9792:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202967561
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
+
+   private static String escapeCharacters(String value) {
+   return value
+   .replaceAll("%s", TEMPORARY_PLACEHOLDER)
--- End diff --

@zentol Could you have a last look on this block code, before I merge it? 
Didn't want to sneak it in. I found out there is an issue when we use '%' sign 
in the description.


> Cannot add html tags in options description
> ---
>
> Key: FLINK-9792
> URL: https://issues.apache.org/jira/browse/FLINK-9792
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> Right now it is impossible to add any html tags in options description, 
> because all "<" and ">" are escaped. Therefore some links there do not work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-17 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202967561
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
+
+   private static String escapeCharacters(String value) {
+   return value
+   .replaceAll("%s", TEMPORARY_PLACEHOLDER)
--- End diff --

@zentol Could you have a last look on this block code, before I merge it? 
Didn't want to sneak it in. I found out there is an issue when we use '%' sign 
in the description.


---


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202966706
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, 
ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch 
environment.
+  */
+class BatchTableDescriptor(
+private val tableEnv: BatchTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

in many places (for example all `addProperties` methods) you are ordering 
methods very weirdly. Rule of thumb should be pubic methods before private. 
Longer story: https://stackoverflow.com/a/1760877/8149051

Many times in this code review I had to jump up & down.


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546310#comment-16546310
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202964290
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table source in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table source.")
+}
+
+val sink: Option[TableSinkTable[T2]] = tableEnv match {
+  // check for a batch table sink in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table sink in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table sink.")
+}
+
+new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createBatchTableSource[T](
--- End diff --

I still do not like this lack of abstraction between batch and streaming in 
form of `createBatchTableSource`/`createStreamTableSource`.

Instead of writing if/elses everywhere in our code there should be some 
common layer that handles such logic. Here half of the problem boils down to 
factories with methods createBatchTableSource and createStreamTableSource. 


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/

[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546311#comment-16546311
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202961529
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,33 +18,299 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * The catalog table takes descriptors which allow for declaring the 
communication to external
+  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
+  * that match the desired configuration.
+  *
+  * Use the provided builder methods to configure the external catalog 
table accordingly.
+  *
+  * The following example shows how to read from a connector using a JSON 
format and
+  * declaring it as a table source:
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * {{{
+  *   ExternalCatalogTable(
+  * new ExternalSystemXYZ()
+  *   .version("0.11"))
+  *   .withFormat(
+  * new Json()
+  *   .jsonSchema("{...}")
+  *   .failOnMissingField(false))
+  *   .withSchema(
+  * new Schema()
+  *   .field("user-name", "VARCHAR").from("u_name")
+  *   .field("count", "DECIMAL")
+  *   .asTableSource()
--- End diff --

make `ExternalCatalogTable` and `BatchTableDescriptor` true builders with 
final fields?


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546318#comment-16546318
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202966786
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, 
SCHEMA_PROCTIME}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND}
+import org.apache.flink.table.factories.StreamTableSinkFactory
+import org.apache.flink.types.Row
+
+/**
+  * Factory base for creating configured instances of [[CsvTableSink]] in 
a stream environment.
+  */
+class CsvAppendTableSinkFactory
--- End diff --

This should be done in separate commit


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546316#comment-16546316
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202963074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
--- End diff --

drop those comments, code is already self explanatory 


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546315#comment-16546315
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202966706
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, 
ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch 
environment.
+  */
+class BatchTableDescriptor(
+private val tableEnv: BatchTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

in many places (for example all `addProperties` methods) you are ordering 
methods very weirdly. Rule of thumb should be pubic methods before private. 
Longer story: https://stackoverflow.com/a/1760877/8149051

Many times in this code review I had to jump up & down.


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202963074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
--- End diff --

drop those comments, code is already self explanatory 


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546314#comment-16546314
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202964952
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, 
ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+  */
+class StreamTableDescriptor(
+private val tableEnv: StreamTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor
+  with StreamableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+  private var updateMode: Option[String] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.addProperties(properties)
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSource(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSource = TableFactoryService
+  .find(classOf[StreamTableSourceFactory[_]], javaMap)
+  .createStreamTableSource(javaMap)
+tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+* Searches for the specified table sink, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSink(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSink = TableFactoryService
+  .find(classOf[StreamTableSinkFactory[_]], javaMap)
+  .createStreamTableSink(javaMap)
+tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+* Searches for the specified table source and sink, configures them 
accordingly, and registers
+* them as a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSourceAndSink(name: String): Unit = {
+registerTableSource(name)
+registerTableSink(name)
+  }
+
+  /**
+* Specifies the format that defines how to read data from a connector.
+*/
+  override def withFormat(format: FormatDescriptor): StreamTableDescriptor 
= {
+formatDescriptor = Some(format)
+this
+  }
+
+  /**
+* Specifies the resulting table schema.
+*/
+  override def withSchema(schema: Schema): StreamTableDescriptor = {
+schemaDescriptor = Some(schema)
+this
+  }
+
+  /**
+* Declares how to perform the conversion between a dynamic table and 
an external connector.
+*
   

[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546307#comment-16546307
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202923853
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,33 +18,299 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * The catalog table takes descriptors which allow for declaring the 
communication to external
+  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
+  * that match the desired configuration.
+  *
+  * Use the provided builder methods to configure the external catalog 
table accordingly.
+  *
+  * The following example shows how to read from a connector using a JSON 
format and
+  * declaring it as a table source:
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * {{{
+  *   ExternalCatalogTable(
+  * new ExternalSystemXYZ()
+  *   .version("0.11"))
+  *   .withFormat(
+  * new Json()
+  *   .jsonSchema("{...}")
+  *   .failOnMissingField(false))
+  *   .withSchema(
+  * new Schema()
+  *   .field("user-name", "VARCHAR").from("u_name")
+  *   .field("count", "DECIMAL")
+  *   .asTableSource()
+  * }}}
+  *
+  * Note: For backwards-compatibility, the table is declared as a table 
source for batch and
+  * streaming environment by default.
+  *
+  * See also [[org.apache.flink.table.factories.TableFactory]] for more 
information about how
+  * to target suitable factories.
+  *
+  * @param connectorDescriptor describes the system to connect to
   */
-class ExternalCatalogTable(
-connectorDesc: ConnectorDescriptor,
-formatDesc: Option[FormatDescriptor],
-schemaDesc: Option[Schema],
-statisticsDesc: Option[Statistics],
-metadataDesc: Option[Metadata])
-  extends TableSourceDescriptor {
-
-  this.connectorDescriptor = Some(connectorDesc)
-  this.formatDescriptor = formatDesc
-  this.schemaDescriptor = schemaDesc
-  this.statisticsDescriptor = statisticsDesc
-  this.metaDescriptor = metadataDesc
-
-  // expose statistics for external table source util
-  override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
--- End diff --

single tab? this is inconsistent with other places


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202923853
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,33 +18,299 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * The catalog table takes descriptors which allow for declaring the 
communication to external
+  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
+  * that match the desired configuration.
+  *
+  * Use the provided builder methods to configure the external catalog 
table accordingly.
+  *
+  * The following example shows how to read from a connector using a JSON 
format and
+  * declaring it as a table source:
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * {{{
+  *   ExternalCatalogTable(
+  * new ExternalSystemXYZ()
+  *   .version("0.11"))
+  *   .withFormat(
+  * new Json()
+  *   .jsonSchema("{...}")
+  *   .failOnMissingField(false))
+  *   .withSchema(
+  * new Schema()
+  *   .field("user-name", "VARCHAR").from("u_name")
+  *   .field("count", "DECIMAL")
+  *   .asTableSource()
+  * }}}
+  *
+  * Note: For backwards-compatibility, the table is declared as a table 
source for batch and
+  * streaming environment by default.
+  *
+  * See also [[org.apache.flink.table.factories.TableFactory]] for more 
information about how
+  * to target suitable factories.
+  *
+  * @param connectorDescriptor describes the system to connect to
   */
-class ExternalCatalogTable(
-connectorDesc: ConnectorDescriptor,
-formatDesc: Option[FormatDescriptor],
-schemaDesc: Option[Schema],
-statisticsDesc: Option[Statistics],
-metadataDesc: Option[Metadata])
-  extends TableSourceDescriptor {
-
-  this.connectorDescriptor = Some(connectorDesc)
-  this.formatDescriptor = formatDesc
-  this.schemaDescriptor = schemaDesc
-  this.statisticsDescriptor = statisticsDesc
-  this.metaDescriptor = metadataDesc
-
-  // expose statistics for external table source util
-  override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
--- End diff --

single tab? this is inconsistent with other places


---


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202915534
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
 ---
@@ -40,10 +41,18 @@
 
@Test
public void testMerging() throws Exception {
-   final Environment env1 = 
EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE);
+   final Map replaceVars1 = new HashMap<>();
+   replaceVars1.put("$VAR_UPDATE_MODE", "update-mode: append");
+   final Environment env1 = EnvironmentFileUtil.parseModified(
+   DEFAULTS_ENVIRONMENT_FILE,
+   replaceVars1);
+
+   final Map replaceVars2 = new HashMap<>();
--- End diff --

```
final Map replaceVars2 = new HashMap<>(replaceVars1);
```
and you can drop the line below


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546312#comment-16546312
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202965060
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, 
ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch 
environment.
+  */
+class BatchTableDescriptor(
+private val tableEnv: BatchTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.addProperties(properties)
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSource(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSource = TableFactoryService
+  .find(classOf[BatchTableSourceFactory[_]], javaMap)
+  .createBatchTableSource(javaMap)
+tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+* Searches for the specified table sink, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSink(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSink = TableFactoryService
+  .find(classOf[BatchTableSinkFactory[_]], javaMap)
+  .createBatchTableSink(javaMap)
+tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+* Searches for the specified table source and sink, configures them 
accordingly, and registers
+* them as a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSourceAndSink(name: String): Unit = {
+registerTableSource(name)
+registerTableSink(name)
+  }
+
+  /**
+* Specifies the format that defines how to read data from a connector.
+*/
+  override def withFormat(format: FormatDescriptor): BatchTableDescriptor 
= {
+formatDescriptor = Some(format)
+this
+  }
+
+  /**
+* Specifies the resulting table schema.
+*/
+  override def withSchema(schema: Schema): BatchTableDescriptor = {
+schemaDescriptor = Some(schema)
+this
+  }
+
+  override def toString: String = {
+getValidProperties.toString
+  }
+
+  // 
--
+
+  private def getValidProperties: DescriptorProperties = {
--- End diff --

duplicated code with `StreamTableDescriptor. getValidProperties`


> Expose descriptor-b

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202964715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
 ---
@@ -19,14 +19,17 @@
 package org.apache.flink.table.descriptors
 
 /**
-  * Common class for all descriptors describing a table sink.
+  * A trait for descriptors that allow to define a format and schema.
   */
-abstract class TableSinkDescriptor extends TableDescriptor {
+trait SchematicDescriptor extends Descriptor {
--- End diff --

are you using it anywhere as interface? what does extracting it to separate 
interface give us? Maybe drop it?


---


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202964290
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table source in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table source.")
+}
+
+val sink: Option[TableSinkTable[T2]] = tableEnv match {
+  // check for a batch table sink in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table sink in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table sink.")
+}
+
+new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createBatchTableSource[T](
--- End diff --

I still do not like this lack of abstraction between batch and streaming in 
form of `createBatchTableSource`/`createStreamTableSource`.

Instead of writing if/elses everywhere in our code there should be some 
common layer that handles such logic. Here half of the problem boils down to 
factories with methods createBatchTableSource and createStreamTableSource. 


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546313#comment-16546313
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202965391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, 
ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+  */
+class StreamTableDescriptor(
--- End diff --

This class duplicates code with `BatchTableDescriptor`


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546309#comment-16546309
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202963234
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table source in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table source.")
+}
+
+val sink: Option[TableSinkTable[T2]] = tableEnv match {
+  // check for a batch table sink in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table sink in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table sink.")
+}
+
+new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createBatchTableSource[T](
+  externalCatalogTable: ExternalCatalogTable,
+  javaMap: util.Map[String, String],
+  statistics: FlinkStatistic)
+: Option[TableSourceTable[T]] = if 
(externalCatalogTable.isTableSource) {
--- End diff --

reverse if/else branches - simpler case should be first

also if you change it to
```
if (!externalCatalogTable.isTableSource) {
  return None
}
val source = TableFactoryService
  .find(classOf[BatchTableSourceFactory[T]], javaMap)
  .createBatchTableSource(javaMap)
val table = new BatchTableSourceTable(
  source,
 

[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546317#comment-16546317
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202964715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
 ---
@@ -19,14 +19,17 @@
 package org.apache.flink.table.descriptors
 
 /**
-  * Common class for all descriptors describing a table sink.
+  * A trait for descriptors that allow to define a format and schema.
   */
-abstract class TableSinkDescriptor extends TableDescriptor {
+trait SchematicDescriptor extends Descriptor {
--- End diff --

are you using it anywhere as interface? what does extracting it to separate 
interface give us? Maybe drop it?


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202966786
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, 
SCHEMA_PROCTIME}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND}
+import org.apache.flink.table.factories.StreamTableSinkFactory
+import org.apache.flink.types.Row
+
+/**
+  * Factory base for creating configured instances of [[CsvTableSink]] in 
a stream environment.
+  */
+class CsvAppendTableSinkFactory
--- End diff --

This should be done in separate commit


---


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546308#comment-16546308
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202915534
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
 ---
@@ -40,10 +41,18 @@
 
@Test
public void testMerging() throws Exception {
-   final Environment env1 = 
EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE);
+   final Map replaceVars1 = new HashMap<>();
+   replaceVars1.put("$VAR_UPDATE_MODE", "update-mode: append");
+   final Environment env1 = EnvironmentFileUtil.parseModified(
+   DEFAULTS_ENVIRONMENT_FILE,
+   replaceVars1);
+
+   final Map replaceVars2 = new HashMap<>();
--- End diff --

```
final Map replaceVars2 = new HashMap<>(replaceVars1);
```
and you can drop the line below


> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202965391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, 
ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+  */
+class StreamTableDescriptor(
--- End diff --

This class duplicates code with `BatchTableDescriptor`


---


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202964952
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, 
ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+  */
+class StreamTableDescriptor(
+private val tableEnv: StreamTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor
+  with StreamableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+  private var updateMode: Option[String] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.addProperties(properties)
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSource(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSource = TableFactoryService
+  .find(classOf[StreamTableSourceFactory[_]], javaMap)
+  .createStreamTableSource(javaMap)
+tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+* Searches for the specified table sink, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSink(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSink = TableFactoryService
+  .find(classOf[StreamTableSinkFactory[_]], javaMap)
+  .createStreamTableSink(javaMap)
+tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+* Searches for the specified table source and sink, configures them 
accordingly, and registers
+* them as a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSourceAndSink(name: String): Unit = {
+registerTableSource(name)
+registerTableSink(name)
+  }
+
+  /**
+* Specifies the format that defines how to read data from a connector.
+*/
+  override def withFormat(format: FormatDescriptor): StreamTableDescriptor 
= {
+formatDescriptor = Some(format)
+this
+  }
+
+  /**
+* Specifies the resulting table schema.
+*/
+  override def withSchema(schema: Schema): StreamTableDescriptor = {
+schemaDescriptor = Some(schema)
+this
+  }
+
+  /**
+* Declares how to perform the conversion between a dynamic table and 
an external connector.
+*
+* In append mode, a dynamic table and an external connector only 
exchange INSERT messages.
+*
+* @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+*/
+  override def inAppendMode(): StreamTableDescriptor = {
+

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202965060
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, 
ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, TableFactoryService}
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch 
environment.
+  */
+class BatchTableDescriptor(
+private val tableEnv: BatchTableEnvironment,
+private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor
+  with RegistrableDescriptor {
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.addProperties(properties)
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSource(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSource = TableFactoryService
+  .find(classOf[BatchTableSourceFactory[_]], javaMap)
+  .createBatchTableSource(javaMap)
+tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+* Searches for the specified table sink, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSink(name: String): Unit = {
+val javaMap = getValidProperties.asMap
+val tableSink = TableFactoryService
+  .find(classOf[BatchTableSinkFactory[_]], javaMap)
+  .createBatchTableSink(javaMap)
+tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+* Searches for the specified table source and sink, configures them 
accordingly, and registers
+* them as a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  override def registerTableSourceAndSink(name: String): Unit = {
+registerTableSource(name)
+registerTableSink(name)
+  }
+
+  /**
+* Specifies the format that defines how to read data from a connector.
+*/
+  override def withFormat(format: FormatDescriptor): BatchTableDescriptor 
= {
+formatDescriptor = Some(format)
+this
+  }
+
+  /**
+* Specifies the resulting table schema.
+*/
+  override def withSchema(schema: Schema): BatchTableDescriptor = {
+schemaDescriptor = Some(schema)
+this
+  }
+
+  override def toString: String = {
+getValidProperties.toString
+  }
+
+  // 
--
+
+  private def getValidProperties: DescriptorProperties = {
--- End diff --

duplicated code with `StreamTableDescriptor. getValidProperties`


---


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202963234
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable[T1, T2](
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceSinkTable[T1, T2] = {
+
+val properties = new DescriptorProperties()
+externalCatalogTable.addProperties(properties)
+val javaMap = properties.asMap
+val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+val source: Option[TableSourceTable[T1]] = tableEnv match {
+  // check for a batch table source in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table source in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table source.")
+}
+
+val sink: Option[TableSinkTable[T2]] = tableEnv match {
+  // check for a batch table sink in this batch environment
+  case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+  // check for a stream table sink in this stream environment
+  case _: StreamTableEnvironment if externalCatalogTable.isStreamTable 
=>
+createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+  case _ =>
+throw new ValidationException(
+  "External catalog table does not support the current environment 
for a table sink.")
+}
+
+new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createBatchTableSource[T](
+  externalCatalogTable: ExternalCatalogTable,
+  javaMap: util.Map[String, String],
+  statistics: FlinkStatistic)
+: Option[TableSourceTable[T]] = if 
(externalCatalogTable.isTableSource) {
--- End diff --

reverse if/else branches - simpler case should be first

also if you change it to
```
if (!externalCatalogTable.isTableSource) {
  return None
}
val source = TableFactoryService
  .find(classOf[BatchTableSourceFactory[T]], javaMap)
  .createBatchTableSource(javaMap)
val table = new BatchTableSourceTable(
  source,
  statistics)
Some(table)
```
it would even further simplify the code (reader wouldn't have to track one 
extra level of nesting)

ditto in other places


---


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6343#discussion_r202961529
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,33 +18,299 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * The catalog table takes descriptors which allow for declaring the 
communication to external
+  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
+  * that match the desired configuration.
+  *
+  * Use the provided builder methods to configure the external catalog 
table accordingly.
+  *
+  * The following example shows how to read from a connector using a JSON 
format and
+  * declaring it as a table source:
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * {{{
+  *   ExternalCatalogTable(
+  * new ExternalSystemXYZ()
+  *   .version("0.11"))
+  *   .withFormat(
+  * new Json()
+  *   .jsonSchema("{...}")
+  *   .failOnMissingField(false))
+  *   .withSchema(
+  * new Schema()
+  *   .field("user-name", "VARCHAR").from("u_name")
+  *   .field("count", "DECIMAL")
+  *   .asTableSource()
--- End diff --

make `ExternalCatalogTable` and `BatchTableDescriptor` true builders with 
final fields?


---


[jira] [Updated] (FLINK-9862) Update end-to-end test to use RocksDB backed timers

2018-07-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9862:
--
Labels: pull-request-available  (was: )

> Update end-to-end test to use RocksDB backed timers
> ---
>
> Key: FLINK-9862
> URL: https://issues.apache.org/jira/browse/FLINK-9862
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should add or modify an end-to-end test to use RocksDB backed timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6294: [FLINK-9013][docs] Document yarn.containers.vcores only b...

2018-07-17 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6294
  
I've rebased it on top of rich formatting feature for documentation, 
therefore only the last commit applies to the issue.

I will also check the `DominantResourceCalculator` once again.


---


[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546302#comment-16546302
 ] 

ASF GitHub Bot commented on FLINK-9013:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6294
  
I've rebased it on top of rich formatting feature for documentation, 
therefore only the last commit applies to the issue.

I will also check the `DominantResourceCalculator` once again.


> Document yarn.containers.vcores only being effective when adapting YARN config
> --
>
> Key: FLINK-9013
> URL: https://issues.apache.org/jira/browse/FLINK-9013
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> Even after specifying {{yarn.containers.vcores}} and having Flink request 
> such a container from YARN, it may not take these into account at all and 
> return a container with 1 vcore.
> The YARN configuration needs to be adapted to take the vcores into account, 
> e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}:
> {code}
> 
>   yarn.resourcemanager.scheduler.class
>   
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
> 
> {code}
> This fact should be documented at least at the configuration parameter 
> documentation of  {{yarn.containers.vcores}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9862) Update end-to-end test to use RocksDB backed timers

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546301#comment-16546301
 ] 

ASF GitHub Bot commented on FLINK-9862:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6351

[FLINK-9862] [test] Extend general puropose DataStream test to have a 
tumbling window

## What is the purpose of the change

This allows our end-to-end tests to have coverage for snapshotting / 
restoring timers, when configured to use different state backends.

## Brief change log

- Add a tumbling window to the `DataStreamAllAroundTestProgram`
- Change default "max out of orderness" setting of the source generator to 0

## Verifying this change

This is a change that affects existing tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9862

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6351.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6351


commit a3bc102303481fa784b9c94d7838b1c2b5f65123
Author: Tzu-Li (Gordon) Tai 
Date:   2018-07-17T10:00:40Z

[FLINK-9862] [test] Extend general puropose DataStream test to have a 
tumbling window

This allows the end-to-end tests to have coverage for testing
checkpointing timers.




> Update end-to-end test to use RocksDB backed timers
> ---
>
> Key: FLINK-9862
> URL: https://issues.apache.org/jira/browse/FLINK-9862
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should add or modify an end-to-end test to use RocksDB backed timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-8163:
-
  Assignee: Chesnay Schepler  (was: Kostas Kloudas)

just got a timeout locally

> NonHAQueryableStateFsBackendITCase test getting stuck on Travis
> ---
>
> Key: FLINK-8163
> URL: https://issues.apache.org/jira/browse/FLINK-8163
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis 
> producing no output for 300s.
> https://travis-ci.org/tillrohrmann/flink/jobs/307988209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...

2018-07-17 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6351

[FLINK-9862] [test] Extend general puropose DataStream test to have a 
tumbling window

## What is the purpose of the change

This allows our end-to-end tests to have coverage for snapshotting / 
restoring timers, when configured to use different state backends.

## Brief change log

- Add a tumbling window to the `DataStreamAllAroundTestProgram`
- Change default "max out of orderness" setting of the source generator to 0

## Verifying this change

This is a change that affects existing tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9862

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6351.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6351


commit a3bc102303481fa784b9c94d7838b1c2b5f65123
Author: Tzu-Li (Gordon) Tai 
Date:   2018-07-17T10:00:40Z

[FLINK-9862] [test] Extend general puropose DataStream test to have a 
tumbling window

This allows the end-to-end tests to have coverage for testing
checkpointing timers.




---


[jira] [Closed] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8163.
---
Resolution: Cannot Reproduce

No occurrence was reported since the linked PR was merged.

> NonHAQueryableStateFsBackendITCase test getting stuck on Travis
> ---
>
> Key: FLINK-8163
> URL: https://issues.apache.org/jira/browse/FLINK-8163
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis 
> producing no output for 300s.
> https://travis-ci.org/tillrohrmann/flink/jobs/307988209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8163:

Fix Version/s: (was: 1.5.2)
   (was: 1.6.0)

> NonHAQueryableStateFsBackendITCase test getting stuck on Travis
> ---
>
> Key: FLINK-8163
> URL: https://issues.apache.org/jira/browse/FLINK-8163
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis 
> producing no output for 300s.
> https://travis-ci.org/tillrohrmann/flink/jobs/307988209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6350: [FLINK-9873][runtime] Log task state when aborting...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6350#discussion_r202962522
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -457,13 +457,20 @@ public CheckpointTriggerResult triggerCheckpoint(
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee != null && ee.getState() == 
ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
+   if (ee == null) {
--- End diff --

according to the `ExecutionVertex` docs this branch shouldn't be necessary 
at all, but i kept it in to be safe.


---


[jira] [Updated] (FLINK-9872) SavepointITCase#testSavepointForJobWithIteration does not properly cancel jobs

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9872:

Summary: SavepointITCase#testSavepointForJobWithIteration does not properly 
cancel jobs  (was: SavepointITCase#testSavepointForJobWithIteration does not 
properly cancell jobs)

> SavepointITCase#testSavepointForJobWithIteration does not properly cancel jobs
> --
>
> Key: FLINK-9872
> URL: https://issues.apache.org/jira/browse/FLINK-9872
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.2, 1.6.0
>
>
> The {{SavepointITCase}} attempts to cancel a job by calling {{cancel}} on a 
> source instance. However this instance isn't actually executed on the 
> cluster, since it is serialized during the submission process.
>  
> Additionally we aren't waiting for the cancellation to finish, causing the 
> test to log several exceptions when the cluster is shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9873) Log actual state when aborting checkpoint due to task not running

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546290#comment-16546290
 ] 

ASF GitHub Bot commented on FLINK-9873:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6350#discussion_r202962522
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -457,13 +457,20 @@ public CheckpointTriggerResult triggerCheckpoint(
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee != null && ee.getState() == 
ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
+   if (ee == null) {
--- End diff --

according to the `ExecutionVertex` docs this branch shouldn't be necessary 
at all, but i kept it in to be safe.


> Log actual state when aborting checkpoint due to task not running
> -
>
> Key: FLINK-9873
> URL: https://issues.apache.org/jira/browse/FLINK-9873
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> Currently, if a checkpoint is triggered while a task s not in a RUNNING state 
> the following message is logged:
> {code:java}
> Checkpoint triggering task {} of job {} is not being executed at the 
> moment.{code}
> We can improve this message to include the actual task state to help diagnose 
> problems.
> This message is also a bit ambiguous, as "being executed" could mean many 
> things, from not "RUNNING", to not being "DEPLOYED", or to not existing at 
> all.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9873) Log actual state when aborting checkpoint due to task not running

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546287#comment-16546287
 ] 

ASF GitHub Bot commented on FLINK-9873:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6350

[FLINK-9873][runtime] Log task state when aborting checkpoint

## What is the purpose of the change

This PR adjusts the logging message for when a checkpoint is declined due 
to tasks not being ready.
We now explicitly log the current task state.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9873

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6350.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6350


commit 4dfe622fc45a2233dbba58640d6aa67be4739f86
Author: zentol 
Date:   2018-07-17T07:34:45Z

[FLINK-9873][runtime] Log task state when aborting checkpoint




> Log actual state when aborting checkpoint due to task not running
> -
>
> Key: FLINK-9873
> URL: https://issues.apache.org/jira/browse/FLINK-9873
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> Currently, if a checkpoint is triggered while a task s not in a RUNNING state 
> the following message is logged:
> {code:java}
> Checkpoint triggering task {} of job {} is not being executed at the 
> moment.{code}
> We can improve this message to include the actual task state to help diagnose 
> problems.
> This message is also a bit ambiguous, as "being executed" could mean many 
> things, from not "RUNNING", to not being "DEPLOYED", or to not existing at 
> all.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9873) Log actual state when aborting checkpoint due to task not running

2018-07-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9873:
--
Labels: pull-request-available  (was: )

> Log actual state when aborting checkpoint due to task not running
> -
>
> Key: FLINK-9873
> URL: https://issues.apache.org/jira/browse/FLINK-9873
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> Currently, if a checkpoint is triggered while a task s not in a RUNNING state 
> the following message is logged:
> {code:java}
> Checkpoint triggering task {} of job {} is not being executed at the 
> moment.{code}
> We can improve this message to include the actual task state to help diagnose 
> problems.
> This message is also a bit ambiguous, as "being executed" could mean many 
> things, from not "RUNNING", to not being "DEPLOYED", or to not existing at 
> all.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6350: [FLINK-9873][runtime] Log task state when aborting...

2018-07-17 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6350

[FLINK-9873][runtime] Log task state when aborting checkpoint

## What is the purpose of the change

This PR adjusts the logging message for when a checkpoint is declined due 
to tasks not being ready.
We now explicitly log the current task state.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9873

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6350.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6350


commit 4dfe622fc45a2233dbba58640d6aa67be4739f86
Author: zentol 
Date:   2018-07-17T07:34:45Z

[FLINK-9873][runtime] Log task state when aborting checkpoint




---


[jira] [Created] (FLINK-9873) Log actual state when aborting checkpoint due to task not running

2018-07-17 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9873:
---

 Summary: Log actual state when aborting checkpoint due to task not 
running
 Key: FLINK-9873
 URL: https://issues.apache.org/jira/browse/FLINK-9873
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.5.1, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.5.2, 1.6.0


Currently, if a checkpoint is triggered while a task s not in a RUNNING state 
the following message is logged:
{code:java}
Checkpoint triggering task {} of job {} is not being executed at the 
moment.{code}
We can improve this message to include the actual task state to help diagnose 
problems.

This message is also a bit ambiguous, as "being executed" could mean many 
things, from not "RUNNING", to not being "DEPLOYED", or to not existing at all.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6997) SavepointITCase fails in master branch sometimes

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546283#comment-16546283
 ] 

ASF GitHub Bot commented on FLINK-6997:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6349

[FLINK-6997][tests] Properly cancel test job

## What is the purpose of the change

With this PR the jobs started in 
`SavepointITCase#testSavepointForJobWithIteration` are properly canceled. 
Previously they remained in a running state until the cluster was shut down, 
causing several exceptions to be logged.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9872

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6349.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6349


commit 434d52a4efaa31da59b04ede010b6a7757ebbcbc
Author: zentol 
Date:   2018-07-17T09:34:29Z

[FLINK-6997][tests] Properly cancel test job




> SavepointITCase fails in master branch sometimes
> 
>
> Key: FLINK-6997
> URL: https://issues.apache.org/jira/browse/FLINK-6997
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.5.0
>Reporter: Ted Yu
>Priority: Critical
>  Labels: test-stability
>
> I got the following test failure (with commit 
> a0b781461bcf8c2f1d00b93464995f03eda589f1)
> {code}
> testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase)
>   Time elapsed: 8.129 sec  <<< ERROR!
> java.io.IOException: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342)
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316)
>   at 
> org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827)
> Caused by: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>   at akka.dispatch.OnComplete.internal(Future.scala:247)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required 
> tasks are currently running.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderS

[GitHub] flink pull request #6349: [FLINK-6997][tests] Properly cancel test job

2018-07-17 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6349

[FLINK-6997][tests] Properly cancel test job

## What is the purpose of the change

With this PR the jobs started in 
`SavepointITCase#testSavepointForJobWithIteration` are properly canceled. 
Previously they remained in a running state until the cluster was shut down, 
causing several exceptions to be logged.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9872

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6349.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6349


commit 434d52a4efaa31da59b04ede010b6a7757ebbcbc
Author: zentol 
Date:   2018-07-17T09:34:29Z

[FLINK-6997][tests] Properly cancel test job




---


[jira] [Created] (FLINK-9872) SavepointITCase#testSavepointForJobWithIteration does not properly cancell jobs

2018-07-17 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9872:
---

 Summary: SavepointITCase#testSavepointForJobWithIteration does not 
properly cancell jobs
 Key: FLINK-9872
 URL: https://issues.apache.org/jira/browse/FLINK-9872
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.5.1, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.5.2, 1.6.0


The {{SavepointITCase}} attempts to cancel a job by calling {{cancel}} on a 
source instance. However this instance isn't actually executed on the cluster, 
since it is serialized during the submission process.

 

Additionally we aren't waiting for the cancellation to finish, causing the test 
to log several exceptions when the cluster is shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6997) SavepointITCase fails in master branch sometimes

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6997:

Fix Version/s: (was: 1.5.2)
   (was: 1.6.0)

> SavepointITCase fails in master branch sometimes
> 
>
> Key: FLINK-6997
> URL: https://issues.apache.org/jira/browse/FLINK-6997
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.5.0
>Reporter: Ted Yu
>Priority: Critical
>  Labels: test-stability
>
> I got the following test failure (with commit 
> a0b781461bcf8c2f1d00b93464995f03eda589f1)
> {code}
> testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase)
>   Time elapsed: 8.129 sec  <<< ERROR!
> java.io.IOException: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342)
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316)
>   at 
> org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827)
> Caused by: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>   at akka.dispatch.OnComplete.internal(Future.scala:247)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required 
> tasks are currently running.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6997) SavepointITCase fails in master branch sometimes

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6997.
---
Resolution: Cannot Reproduce

> SavepointITCase fails in master branch sometimes
> 
>
> Key: FLINK-6997
> URL: https://issues.apache.org/jira/browse/FLINK-6997
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.5.0
>Reporter: Ted Yu
>Priority: Critical
>  Labels: test-stability
>
> I got the following test failure (with commit 
> a0b781461bcf8c2f1d00b93464995f03eda589f1)
> {code}
> testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase)
>   Time elapsed: 8.129 sec  <<< ERROR!
> java.io.IOException: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342)
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316)
>   at 
> org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827)
> Caused by: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>   at akka.dispatch.OnComplete.internal(Future.scala:247)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required 
> tasks are currently running.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-07-17 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546270#comment-16546270
 ] 

Chesnay Schepler commented on FLINK-8101:
-

[~sharju] no, this Jira will not be fixed for 1.6.

> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Priority: Major
> Fix For: 1.7.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8101) Elasticsearch 6.x support

2018-07-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8101:

Fix Version/s: (was: 1.6.0)
   1.7.0

> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Priority: Major
> Fix For: 1.7.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6347
  
`vertices` is the correct plural, but this is another one of those cases 
where fixing it might cause more harm than good since it could cause merge 
conflicts, yet provides no functional benefit.

Additionally this PR makes a lot of whitespace changes that should be 
reverted in any case.


---


[jira] [Commented] (FLINK-6997) SavepointITCase fails in master branch sometimes

2018-07-17 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546262#comment-16546262
 ] 

Chesnay Schepler commented on FLINK-6997:
-

Either we're dealing with 2 separate issues here or it's not a timing issue.

The original exception is thrown if any task is not in a RUNNING state.

The exception that Till got shows that the job failed while the checkpoint was 
underway.

Both cases could be explained by a failure of the job, in the first case 
shortly before the savepoint is triggered, in the latter during the savepoint 
operation. I don't have an explanation for possible failures at this moment, 
but it could be virtually anything.

In any case, this test got some issues. It attempts to cancel jobs by calling 
{{cancel}} on one the source instances, but there are obviously not the 
instances actually running on the cluster. Even if this worked we aren't 
waiting for the cancellation to happen and shutdown the cluster, resulting in a 
barrage of exceptions in the logs. Since the failure already occurs before the 
second job is even start these can't explain the test failures though.

> SavepointITCase fails in master branch sometimes
> 
>
> Key: FLINK-6997
> URL: https://issues.apache.org/jira/browse/FLINK-6997
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.5.0
>Reporter: Ted Yu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.2, 1.6.0
>
>
> I got the following test failure (with commit 
> a0b781461bcf8c2f1d00b93464995f03eda589f1)
> {code}
> testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase)
>   Time elapsed: 8.129 sec  <<< ERROR!
> java.io.IOException: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342)
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316)
>   at 
> org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827)
> Caused by: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>   at akka.dispatch.OnComplete.internal(Future.scala:247)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required 
> tasks are currently running.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>

[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-07-17 Thread Sami Harju (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546246#comment-16546246
 ] 

Sami Harju commented on FLINK-8101:
---

Hi, any updates to this? Are we getting ES 6 support in 1.6?

> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9871) Use Description class for ConfigOptions with rich formatting

2018-07-17 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-9871:
---

 Summary: Use Description class for ConfigOptions with rich 
formatting
 Key: FLINK-9871
 URL: https://issues.apache.org/jira/browse/FLINK-9871
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.6.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.6.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9669) Introduce task manager assignment store

2018-07-17 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-9669:
-

Reopen to remove fixVersion

> Introduce task manager assignment store
> ---
>
> Key: FLINK-9669
> URL: https://issues.apache.org/jira/browse/FLINK-9669
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2   3   4   >