[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346669#comment-16346669
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5240


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346665#comment-16346665
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5240
  
Thanks for the review @fhueske. I will merge this now. We definitely need 
follow-up issues for this.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341353#comment-16341353
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
--- End diff --

`periodicBoundedOOOWatermarks()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.

***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341357#comment-16341357
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168781
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
--- End diff --

`preserveSourceTimestamps()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341345#comment-16341345
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164122214
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/NormalizedProperties.scala
 ---
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.io.Serializable
+import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, 
Long => JLong}
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorUtils._
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTableSchema
+import org.apache.flink.table.plan.stats.ColumnStats
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.Preconditions.checkNotNull
+
+import scala.collection.mutable
+
+/**
+  * Utility class for having a unified string-based representation of 
Table API related classes
+  * such as [[TableSchema]], [[TypeInformation]], [[WatermarkStrategy]], 
etc.
+  */
+class NormalizedProperties(
--- End diff --

Rename to `TableSourceProperties`? `NormalizedProperties` is quite generic


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341354#comment-16341354
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164166552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
--- End diff --

`periodicAscendingWatermarks()`?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaratio

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341358#comment-16341358
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164169928
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
+  * describe the desired table source. The factory allows for matching to 
the given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
+  * the current classpath to be found.
+  */
+trait TableSourceFactory[T] {
--- End diff --

We might want to add a method that exposes all properties of the connector 
and format that the factory supports. 


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341351#comment-16341351
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168602
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
--- End diff --

`timestampsFromExtractor()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design documen

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341355#comment-16341355
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
+watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+this
+  }
+
+  /**
+* Sets a custom watermark strategy to be used for the rowtime 
attribute.
+*/
+  def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = {
--- End diff --

`watermarksFromStrategy()`


> Create unified interfaces to configure and instatiate TableSources
> --

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341350#comment-16341350
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164150350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
--- End diff --

"previously defined"


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341356#comment-16341356
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168933
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
--- End diff --

`timestampsFromField()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341352#comment-16341352
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164151474
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as a processing-time 
attribute.
+*
+* E.g. field("myString", Types.STRING).proctime()
+*/
+  def proctime(): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.PROCTIME -> 
DescriptorUtils.TRUE)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as an event-time attribute.
+*
+* E.g. field("myString", Types.STRING).rowtime(...)
--- End diff --

`field("procTime", Types.SQL_TIMESTAMP).proctime()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341359#comment-16341359
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164149992
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
--- End diff --

Add that fields are matched by exact name by default.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341348#comment-16341348
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164126202
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -39,6 +39,9 @@ case class SqlParserException(
 
 /**
   * General Exception for all errors during table handling.
+  *
+  * This exception indicates that an internal error occurred or the 
feature is not fully
--- End diff --

"This exception indicates that an internal error occurred or that a feature 
is not supported yet. Usually, this exception does not indicate a fault of the 
user."



> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341347#comment-16341347
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164151340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as a processing-time 
attribute.
+*
+* E.g. field("myString", Types.STRING).proctime()
--- End diff --

`field("procTime", Types.SQL_TIMESTAMP).proctime()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341349#comment-16341349
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168331
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
--- End diff --

`preserveSourceWatermarks()`

`DataStream` is only an internal aspect that's not visible when using table 
sources.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type:

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341346#comment-16341346
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164131675
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CSV.scala
 ---
@@ -139,26 +139,26 @@ class CSV extends EncodingDescriptor("csv") {
   }
 
   /**
-* Internal method for encoding properties conversion.
+* Internal method for format properties conversion.
 */
-  override protected def addEncodingProperties(properties: 
NormalizedProperties): Unit = {
-fieldDelim.foreach(properties.putString("field-delimiter", _))
-lineDelim.foreach(properties.putString("line-delimiter", _))
-properties.putTableSchema("fields", encodingSchema.toIndexedSeq)
-quoteCharacter.foreach(properties.putCharacter("quote-character", _))
-commentPrefix.foreach(properties.putString("comment-prefix", _))
-isIgnoreFirstLine.foreach(properties.putBoolean("ignore-first-line", 
_))
-lenient.foreach(properties.putBoolean("ignore-parse-errors", _))
+  override protected def addFormatProperties(properties: 
NormalizedProperties): Unit = {
+
fieldDelim.foreach(properties.putString(DescriptorUtils.FIELD_DELIMITER, _))
--- End diff --

I would not define the constants globally. Some constants should be global, 
but constants for specific connectors or formats, should go to the respective 
descriptor.
IMO, it would be better to have these keys in `CSV` or the class that 
validates the properties of a certain type.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341313#comment-16341313
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5240
  
Hi Timo,

the PR looks good overall. I've made a few suggestion mostly about renaming 
methods or extending docs. I'd also propose to add a `supportedProperties()` 
method to `TableSourceFactory` that can be used to validate whether the factory 
supports all user-provided properties of a connector or format.

What do you think?
Fabian


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339532#comment-16339532
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5240
  
Thanks for the feedback @fhueske. I hope I could address most of it. I 
think we should merge this PR (if you agree) and add more PRs for this issue as 
the next steps. I suggest the following subtasks:

- Add validation for the CSV format
- Add full CsvTableSourceFactory support (incl. proctime, rowtime, and 
schema mapping)
- Add a JSON schema parser to the JSON and logic for creating a table 
source from it
- Add validation for the JSON format
- Add validation for the Rowtime descriptor
- Add validation for StreamTableDescriptor
- Add validation for BatchTableDescriptor
- Add KafkaTableSource factories 

What do you think?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339477#comment-16339477
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163899364
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/DescriptorsTest.scala
 ---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class DescriptorsTest extends TableTestBase {
--- End diff --

Let's do it once the validation methods are in place.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339331#comment-16339331
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163866463
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
+  * describe the desired table source. The factory allows for matching to 
the given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
--- End diff --

No, any JAR file can be used. We use similar functionality for the Flink 
file systems. For example, we simply need to add a file to 
 `META_INF/services/org.apache.flink.table.sources.TableSourceFactory` into 
the Kafka connector jar. The Java Service Provider Interfaces do the rest for 
us.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339323#comment-16339323
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163865339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.statistics
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing a table source.
+  */
+abstract class TableSourceDescriptor extends Descriptor {
+
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var encodingDescriptor: Option[EncodingDescriptor] = None
+  protected var proctimeDescriptor: Option[Proctime] = None
+  protected var rowtimeDescriptor: Option[Rowtime] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
--- End diff --

I added this functionality to the `Schema` descriptor.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339316#comment-16339316
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163863728
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+
+/**
+  * Describes a schema of a table.
+  */
+class Schema extends Descriptor {
+
+  private val tableSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+
+  /**
+* Sets the schema with field names and the types. Required.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): Schema = {
--- End diff --

We have no syntax for schema strings yet. `ROW(name TYPE)` is a datatype 
not a schema.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339239#comment-16339239
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163847018
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
+this.fieldMapping = Some(tableToJsonMapping)
+this
+  }
+
+  /**
+* Sets flag whether to fail if a field is missing or not.
+*
+* @param failOnMissingField If set to true, the operation fails if 
there is a missing field.
+*   If set to false, a missing field is set to 
null.
+* @return The builder.
+*/
+  def failOnMissingField(failOnMissingField: Boolean): JSON = {
+this.failOnMi

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339212#comment-16339212
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163841505
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
+this.fieldMapping = Some(tableToJsonMapping)
+this
+  }
+
+  /**
+* Sets flag whether to fail if a field is missing or not.
+*
+* @param failOnMissingField If set to true, the operation fails if 
there is a missing field.
+*   If set to false, a missing field is set to 
null.
+* @return The builder.
+*/
+  def failOnMissingField(failOnMissingField: Boolean): JSON = {
+this.failOnMi

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339205#comment-16339205
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163840224
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
--- End diff --

I think relying on a standard is a good idea. We should add this as a 
follow-up issue. It is not trivial to add the JSON schema because we have to 
decide how we treat e.g. the type `number` or `"type": "string", "format": 
"date"` or enums (http://json-schema.org/example2.html).


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339200#comment-16339200
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163838489
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for 
describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
--- End diff --

We could add a method here but the same logic would have to be done 
somewhere for properties that have been read from a config file.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339195#comment-16339195
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163836869
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for 
describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
+
+  /**
+* Internal method for properties conversion.
+*/
+  def addProperties(properties: NormalizedProperties): Unit
--- End diff --

I will add `private[flink]` but it will be public in Java.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339193#comment-16339193
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163836415
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.connector
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(private val tpe: String) extends 
Descriptor {
+
+  /**
+* Internal method for properties conversion.
+*/
+  final def addProperties(properties: NormalizedProperties): Unit = {
+properties.putString(connector("type"), tpe)
+val connectorProperties = new NormalizedProperties()
+addConnectorProperties(connectorProperties)
+connectorProperties.getProperties.foreach { case (k, v) =>
--- End diff --

This logic ensures that you cannot add an encoding in a connector. All 
properties that are added are prefixed correctly with "connector.".


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339183#comment-16339183
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163835834
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.connector
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(private val tpe: String) extends 
Descriptor {
--- End diff --

My goal was to keep the descriptor as lightweight as possible. They only 
describe but do not validate. If we would add validation logic here, we would 
also need to add the same logic somewhere else in the stack (i.e. table 
factories) if we parse the properties from a file.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339165#comment-16339165
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163830189
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -107,6 +111,16 @@ abstract class BatchTableEnvironment(
 }
   }
 
+  /**
+* Creates a table from a descriptor that describes the resulting table 
schema, the source
+* connector, source encoding, and other properties.
+*
+* @param schema schema descriptor describing the table to create
+*/
+  def createTable(schema: Schema): BatchTableSourceDescriptor = {
--- End diff --

I will improve the definition but in general I think it is nice to have 
more fluent API without too much nesting. It is also inconvenient to call a 
method on table environment and pass it to its parameters.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334945#comment-16334945
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163061338
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+
+/**
+  * Describes a schema of a table.
+  */
+class Schema extends Descriptor {
+
+  private val tableSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+
+  /**
+* Sets the schema with field names and the types. Required.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): Schema = {
--- End diff --

add a method `def schema(schema: String): Schema` that parses the schema 
string?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334946#comment-16334946
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceFactoryService}
+
+/**
+  * Descriptor for specifying a table source in a streaming environment.
+  */
+class StreamTableSourceDescriptor(
+tableEnv: StreamTableEnvironment,
+schema: Schema)
+  extends TableSourceDescriptor {
+
+  schemaDescriptor = Some(schema)
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it.
+*/
+  def toTableSource: TableSource[_] = {
+val source = TableSourceFactoryService.findTableSourceFactory(this)
+source match {
+  case _: StreamTableSource[_] => source
+  case _ => throw new TableException(
+s"Found table source '${source.getClass.getCanonicalName}' is not 
applicable " +
+  s"in a streaming environment.")
+}
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it as a table.
+*/
+  def toTable: Table = {
+tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  def register(name: String): Unit = {
+tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+* Specifies an connector for reading data from a connector.
+*/
+  def withConnector(connector: ConnectorDescriptor): 
StreamTableSourceDescriptor = {
+connectorDescriptor = Some(connector)
+this
+  }
+
+  /**
+* Specifies an encoding that defines how to read data from a connector.
+*/
+  def withEncoding(encoding: EncodingDescriptor): 
StreamTableSourceDescriptor = {
+encodingDescriptor = Some(encoding)
--- End diff --

check if the connector requires an encoding?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334949#comment-16334949
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163071117
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/DescriptorsTest.scala
 ---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class DescriptorsTest extends TableTestBase {
--- End diff --

I would move the tests to a separate class per descriptor. 
If we add a `validate` method to `Descriptor` this needs to be tested as 
well.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334952#comment-16334952
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceFactoryService}
+
+/**
+  * Descriptor for specifying a table source in a streaming environment.
+  */
+class StreamTableSourceDescriptor(
+tableEnv: StreamTableEnvironment,
+schema: Schema)
+  extends TableSourceDescriptor {
+
+  schemaDescriptor = Some(schema)
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it.
+*/
+  def toTableSource: TableSource[_] = {
+val source = TableSourceFactoryService.findTableSourceFactory(this)
+source match {
+  case _: StreamTableSource[_] => source
+  case _ => throw new TableException(
+s"Found table source '${source.getClass.getCanonicalName}' is not 
applicable " +
+  s"in a streaming environment.")
+}
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it as a table.
+*/
+  def toTable: Table = {
+tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  def register(name: String): Unit = {
+tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+* Specifies an connector for reading data from a connector.
+*/
+  def withConnector(connector: ConnectorDescriptor): 
StreamTableSourceDescriptor = {
+connectorDescriptor = Some(connector)
--- End diff --

check if an encoding was added that the connector does not need?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discuss

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334933#comment-16334933
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162988639
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -23,22 +23,74 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource, TableSourceFactoryService}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
   */
 object ExternalTableSourceUtil extends Logging {
 
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable(
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceTable[_] = {
+
+// check for the legacy external catalog path
+if (externalCatalogTable.isLegacyTableType) {
+  LOG.warn("External catalog tables based on TableType annotations are 
deprecated. " +
+"Please consider updating them to TableSourceFactories.")
+  fromExternalCatalogTableType(externalCatalogTable)
+}
+// use the factory approach
+else {
+  val source = 
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+  tableEnv match {
+// check for a batch table source in this batch environment
+case _: BatchTableEnvironment =>
+  source match {
+case bts: BatchTableSource[_] =>
+  new BatchTableSourceTable(
+bts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a batch environment.")
+  }
+// check for a stream table source in this streaming environment
+case _: StreamTableEnvironment =>
+  source match {
+case sts: StreamTableSource[_] =>
+  new StreamTableSourceTable(
+sts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a streaming environment.")
+  }
+case _ => throw new TableException("Unsupported table 
environment.")
+  }
+}
+  }
+
+  // 
--
+  // NOTE: the following line can be removed once we drop support for 
TableType
--- End diff --

line or lines? 
Create a JIRA and link it here as reference?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tail

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334927#comment-16334927
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162964475
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -136,12 +136,51 @@ case class CatalogAlreadyExistException(
   def this(catalog: String) = this(catalog, null)
 }
 
+/**
+  * Exception for not finding a 
[[org.apache.flink.table.sources.TableSourceFactory]] for the
+  * given properties.
+  *
+  * @param properties properties that describe the table source
+  * @param cause the cause
+  */
+case class NoMatchingTableSourceException(
+properties: Map[String, String],
+cause: Throwable)
+extends RuntimeException(
+  s"Could not find a table source factory in the classpath satisfying 
the " +
+s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 
).mkString("\n")}",
+  cause) {
+
+  def this(properties: Map[String, String]) = this(properties, null)
+}
+
+/**
+  * Exception for finding more than one 
[[org.apache.flink.table.sources.TableSourceFactory]] for
+  * the given properties.
+  *
+  * @param properties properties that describe the table source
+  * @param cause the cause
+  */
+case class AmbiguousTableSourceException(
+properties: Map[String, String],
+cause: Throwable)
+extends RuntimeException(
+  s"More than one table source factory in the classpath satisfying the 
" +
+s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 
).mkString("\n")}",
+  cause) {
+
+  def this(properties: Map[String, String]) = this(properties, null)
+}
+
 /**
   * Exception for not finding a [[TableSourceConverter]] for a given table 
type.
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead.
   */
+@Deprecated
+@deprecated("Use table factories instead.")
--- End diff --

Give a class name.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334953#comment-16334953
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012882
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{BatchTableSource, TableSource, 
TableSourceFactoryService}
+
+class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, schema: 
Schema)
--- End diff --

Add `RowtimeDescriptor`. Batch table sources support timestamp extraction 
as well.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334930#comment-16334930
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162965980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.{DataSet, 
ExecutionEnvironment}
 import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.api._
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, 
ConnectorDescriptor}
--- End diff --

remove


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334941#comment-16334941
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163010702
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for 
describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
+
+  /**
+* Internal method for properties conversion.
+*/
+  def addProperties(properties: NormalizedProperties): Unit
--- End diff --

does this method have to be public or can we hide it?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334937#comment-16334937
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r16231
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.statistics
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing a table source.
+  */
+abstract class TableSourceDescriptor extends Descriptor {
+
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var encodingDescriptor: Option[EncodingDescriptor] = None
+  protected var proctimeDescriptor: Option[Proctime] = None
+  protected var rowtimeDescriptor: Option[Rowtime] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
--- End diff --

We might need another descriptor for mapping fields of the encoding (or 
connector) to fields in the table schema. This can be used to rename or select 
fields from the encoding to the table schema. This would be the configuration 
for the `DefinedFieldMapping` interface.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334942#comment-16334942
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163007886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
+this.fieldMapping = Some(tableToJsonMapping)
+this
+  }
+
+  /**
+* Sets flag whether to fail if a field is missing or not.
+*
+* @param failOnMissingField If set to true, the operation fails if 
there is a missing field.
+*   If set to false, a missing field is set to 
null.
+* @return The builder.
+*/
+  def failOnMissingField(failOnMissingField: Boolean): JSON = {
+this.failOnMi

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334944#comment-16334944
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163011304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for 
describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
--- End diff --

Should we add a validation method that checks if the descriptor is valid?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334939#comment-16334939
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163002001
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
--- End diff --

We might want to make field mappings independent of the encoding. For 
example field mappings could also be used for JDBC connectors which do not have 
an encoding.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334951#comment-16334951
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163071426
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, 
PackratParsers}
+
+/**
+  * Utilities to convert 
[[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+  * string representation and back.
+  */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
--- End diff --

We need unit tests for the parser.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334932#comment-16334932
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162967648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,28 +18,282 @@
 
 package org.apache.flink.table.catalog
 
-import java.util.{HashMap => JHashMap, Map => JMap}
 import java.lang.{Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
-import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.{TableException, TableSchema}
+import 
org.apache.flink.table.catalog.ExternalCatalogTable.{TableTypeConnector, 
toConnectorDescriptor, toMetadataDescriptor, toStatisticsDescriptor}
+import org.apache.flink.table.descriptors.DescriptorUtils.{connector, 
metadata}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
   * Defines a table in an [[ExternalCatalog]].
-  *
-  * @param tableTypeTable type, e.g csv, hbase, kafka
-  * @param schema   Schema of the table (column names and 
types)
-  * @param properties   Properties of the table
-  * @param statsStatistics of the table
-  * @param comment  Comment of the table
-  * @param createTime   Create timestamp of the table
-  * @param lastAccessTime   Timestamp of last access of the table
   */
-case class ExternalCatalogTable(
+class ExternalCatalogTable(
--- End diff --

Add descriptions for constructor arguments


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334948#comment-16334948
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163011691
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var rowtimeName: Option[String] = None
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Declares a field of the schema to be the rowtime attribute. Required.
+*
+* @param fieldName The name of the field that becomes the processing 
time field.
+*/
+  def field(fieldName: String): Rowtime = {
+rowtimeName = Some(fieldName)
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
+watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+thi

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334950#comment-16334950
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163067483
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, 
PackratParsers}
+
+/**
+  * Utilities to convert 
[[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+  * string representation and back.
+  */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
--- End diff --

Some examples about the supported syntax would be good. 
Would also be good to add these examples to the method docs that accept 
type strings.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334947#comment-16334947
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163065450
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
+  * describe the desired table source. The factory allows for matching to 
the given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
--- End diff --

do all need to be added to the same file? Or can we have separate files for 
different modules. For instance, a `Kafka011JsonTableFactory` would be in the 
Kafka connectors module. Would a user have to change the service file if the 
Kafka factory should be used or can we built it in a way that it is sufficient 
include the Kafka connectors JAR?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334943#comment-16334943
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162995916
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.connector
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(private val tpe: String) extends 
Descriptor {
+
+  /**
+* Internal method for properties conversion.
+*/
+  final def addProperties(properties: NormalizedProperties): Unit = {
+properties.putString(connector("type"), tpe)
+val connectorProperties = new NormalizedProperties()
+addConnectorProperties(connectorProperties)
+connectorProperties.getProperties.foreach { case (k, v) =>
--- End diff --

why do we need to go over the properties again? Couldn't we implement 
`addConnectorProperties` to properly add the properties directly into 
`properties`?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334940#comment-16334940
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162996455
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+/**
+  * Utilities for working with a 
[[org.apache.flink.table.descriptors.Descriptor]].
+  */
+object DescriptorUtils {
+
+  def hasConnector(properties: util.Map[String, String], connector: 
String): Boolean = {
+val tpe = properties.get("connector.type")
+tpe != null || tpe == connector
--- End diff --

should be `tpe != null && tpe == connector`?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334938#comment-16334938
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163005744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
--- End diff --

Should we add a method that defines the schema with a JSON Schema string? 
We would need a parser, but have immediate support for nested schema. 

Alternatively, we could use the nested schema parser of `TypeStringUtils` 
but this would not be JSON Schema.




> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334936#comment-16334936
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163007563
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
+this.fieldMapping = Some(tableToJsonMapping)
+this
+  }
+
+  /**
+* Sets flag whether to fail if a field is missing or not.
+*
+* @param failOnMissingField If set to true, the operation fails if 
there is a missing field.
+*   If set to false, a missing field is set to 
null.
+* @return The builder.
+*/
+  def failOnMissingField(failOnMissingField: Boolean): JSON = {
+this.failOnMi

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334931#comment-16334931
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162990874
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -23,22 +23,74 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource, TableSourceFactoryService}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
   */
 object ExternalTableSourceUtil extends Logging {
 
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable(
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceTable[_] = {
+
+// check for the legacy external catalog path
+if (externalCatalogTable.isLegacyTableType) {
+  LOG.warn("External catalog tables based on TableType annotations are 
deprecated. " +
+"Please consider updating them to TableSourceFactories.")
+  fromExternalCatalogTableType(externalCatalogTable)
+}
+// use the factory approach
+else {
+  val source = 
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+  tableEnv match {
+// check for a batch table source in this batch environment
+case _: BatchTableEnvironment =>
+  source match {
+case bts: BatchTableSource[_] =>
+  new BatchTableSourceTable(
+bts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a batch environment.")
+  }
+// check for a stream table source in this streaming environment
+case _: StreamTableEnvironment =>
+  source match {
+case sts: StreamTableSource[_] =>
+  new StreamTableSourceTable(
+sts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a streaming environment.")
+  }
+case _ => throw new TableException("Unsupported table 
environment.")
+  }
+}
+  }
+
+  // 
--
+  // NOTE: the following line can be removed once we drop support for 
TableType
--- End diff --

I think we can also remove the `org.reflections:reflections` dependency 
once we removed this.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiati

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334934#comment-16334934
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162996633
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+/**
+  * Utilities for working with a 
[[org.apache.flink.table.descriptors.Descriptor]].
+  */
+object DescriptorUtils {
+
+  def hasConnector(properties: util.Map[String, String], connector: 
String): Boolean = {
+val tpe = properties.get("connector.type")
+tpe != null || tpe == connector
+  }
+
+  def hasEncoding(properties: util.Map[String, String], encoding: String): 
Boolean = {
+val tpe = properties.get("encoding.type")
+tpe != null || tpe == encoding
--- End diff --

should be  `tpe != null && tpe == encoding`?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334928#comment-16334928
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162947784
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -125,6 +129,16 @@ abstract class StreamTableEnvironment(
 }
   }
 
+  /**
+* Creates a table from a descriptor that describes the resulting table 
schema, the source
+* connector, the source encoding, and other properties.
+*
+* @param schema schema descriptor describing the table to create
+*/
+  def createTable(schema: Schema): StreamTableSourceDescriptor = {
--- End diff --

See comment on `BatchTableEnvironment`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334926#comment-16334926
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162964487
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -156,7 +195,10 @@ case class NoMatchedTableSourceConverterException(
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead.
   */
+@Deprecated
+@deprecated("Use table source factories instead.")
--- End diff --

Give a class name.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334935#comment-16334935
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162994182
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.connector
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(private val tpe: String) extends 
Descriptor {
--- End diff --

Should a `ConnectorDescriptor` know whether it requires an encoding? For 
example a file descriptor needs an encoding but a JDBC connector doesn't.

This property would then be used to validate the configuration


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334929#comment-16334929
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162947812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -107,6 +111,16 @@ abstract class BatchTableEnvironment(
 }
   }
 
+  /**
+* Creates a table from a descriptor that describes the resulting table 
schema, the source
+* connector, source encoding, and other properties.
+*
+* @param schema schema descriptor describing the table to create
+*/
+  def createTable(schema: Schema): BatchTableSourceDescriptor = {
--- End diff --

I'm not sure about the approach of returning a `TableSourceDescriptor`. I 
think it would be better if the table creation and registration would be 
completed within this method, i.e., the table should be completely defined by 
the argument of the method.

For example

```
tEnv.registerTableSource(
  "MyTable",
  TableSource.create(tEnv)
.withSchema(
  Schema()
.field(...)
.field(...))
   .withConnector()
 ...
   .toTableSource()
  )
```

In this design, we would reuse existing `registerTableSource` method and 
`TableSource.create` is a static method that returns a `TableSourceDescriptor`. 
Not sure if this is the best approach, but I like that the table is completely 
defined within the method call.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311429#comment-16311429
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5240

[FLINK-8240] [table] Create unified interfaces to configure and instatiate 
TableSources

## What is the purpose of the change

This PR presents the inital version of the new unified TableSource API. The 
API is based on a descriptor approach. A descriptor allows for describing 
parameters and behavior. They contain no logic but only store information and 
translate it to normalized string-based properties.

The following example shows how a CSV table source could be specified in 
the future:

```
tableEnv
  .createTable(
Schema()
  .field("myfield", Types.STRING)
  .field("myfield2", Types.INT))
  .withConnector(
FileSystem()
  .path("/path/to/csv"))
  .withEncoding(
CSV()
  .field("myfield", Types.STRING)
  .field("myfield2", Types.INT)
  .quoteCharacter(';')
  .fieldDelimiter("#")
  .lineDelimiter("\r\n")
  .commentPrefix("%%")
  .ignoreFirstLine()
  .ignoreParseErrors())
  .withRowtime(
Rowtime()
  .field("rowtime")
  .timestampFromDataStream()
  .watermarkFromDataStream())
  .withProctime(
Proctime()
  .field("myproctime"))
```

They get translated into:

```
"schema.0.name" -> "myfield",
"schema.0.type" -> "VARCHAR",
"schema.1.name" -> "myfield2",
"schema.1.type" -> "INT",
"connector.type" -> "filesystem",
"connector.path" -> "/path/to/csv",
"encoding.type" -> "csv",
"encoding.fields.0.name" -> "myfield",
"encoding.fields.0.type" -> "VARCHAR",
"encoding.fields.1.name" -> "myfield2",
"encoding.fields.1.type" -> "INT",
"encoding.quote-character" -> ";",
"encoding.field-delimiter" -> "#",
"encoding.line-delimiter" -> "\r\n",
"encoding.comment-prefix" -> "%%",
"encoding.ignore-first-line" -> "true",
"encoding.ignore-parse-errors" -> "true",
"rowtime.0.name" -> "rowtime",
"rowtime.0.timestamp.type" -> "stream-record",
"rowtime.0.watermark.type" -> "preserving",
"proctime" -> "myproctime"
```

This PR also reworks the discovery of table sources by deprecating the 
`@TableType` annotation and reflection-based discovery with 
`TableSourceFactory` interfaces and standard Java Service Provider Interfaces 
(SPI). Now the table factories can use the above properties to create table 
sources from. The `ExternalCatalogTable` class has been reworked to use the new 
descriptor-based approach as well, however, we should be fully source code 
backwards compatible.

I agree that there are more tests missing and we should also decide where 
and how the validation should happen. I think it should happen mostly in the 
table source builders. We could also introduce some global dictionary class to 
use constants for properties instead of strings at different positions.

What do you think?

## Brief change log

  - Adds descriptors for schema, connectors, encoding, statistics, 
metadata, proctime, and rowtime
  - Adds table factory discovery based on unified properties

## Verifying this change

 - Added `DescriptorsTest`
 - ExternalCatalog tests are still working
 - More tests will follow...

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? ScalaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8240

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5240


commit a8ccac6895bde97b154c1cbb442a0ac6e901b4c3
Author: twalthr 
Date:   2017-12-15T09:18:20Z

[FLINK-8240] [table] Create unified interfaces to configure and inst

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2017-12-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300222#comment-16300222
 ] 

Fabian Hueske commented on FLINK-8240:
--

Thanks for starting this discussion [~twalthr].

I agree, we need a common properties API to specify table sources. This API can 
be used for several purposes:

- defining table sources in the Table API (as in [~twalthr]'s example)
- defining table sources in a DDL statement (as mentioned by [~wheat9]). The 
DDL statement would be parsed and converted into a properties map.
- defining table sources in a catalog file for a SQL client
- saving and loading table sources / catalog information in the Table API as 
proposed in FLINK-4088

Together with the properties API, we should also add util classes to parse the 
a Properties object, i.e., a util class for each system (FileSystem, Kafka, 
etc.) and encoding (Csv, Avro, Orc, etc.).
These utility classes then ease the implementation of the factory classes and 
ensure a uniform behavior.

I don't think that each table source _must_ provide a factory because this 
would mean that the properties API would need to be extended for each supported 
system (e.g, HBase, JDBC, etc.). This would be an obstacle for user-defined 
table sources. However, I agree that we should aim that all Flink built-in / 
provided table sources can be instantiated via the properties API.

> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2017-12-19 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296546#comment-16296546
 ] 

Timo Walther commented on FLINK-8240:
-

Thanks for your response [~wheat9]. We need to decide how the syntax for 
{{CREATE EXTERNAL TABLE}} will look like. It could look more like Hive or more 
like the unified interace of this issue. But in any case such a statement would 
compile down to the unified interface. We won't support every combination of 
connector/encoding but with this abstraction we don't need to expose things 
like a {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}, etc. anymore. From 
an API level we have a clear separation that might (or might not) also separate 
components internally in the future.

Btw not all tables need to be built with a factory. For now, we will keep the 
builders inside every table source (like {{CsvTableSource.builder()}}). This is 
also needed because you cannot express everything as a string property. The 
factories will use the builders to create the table sources. 

> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2017-12-19 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296435#comment-16296435
 ] 

Haohui Mai commented on FLINK-8240:
---

It seems that it is a great use case of layered table sources / converters, 
thus I'm not fully sure that all tables should be built using {{TableFactory}} 
yet.

Popping up one level, I have a relevant question -- assuming that we need to 
implement the {{CREATE EXTERNAL TABLE}} statement. How will the statement look 
like? Here is an example of Hive's {{CREATE EXTERNAL TABLE}} statement:

{code}
CREATE EXTERNAL TABLE weatherext ( wban INT, date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
LOCATION ‘ /hive/data/weatherext’;
{code}

It seems that combinations of {{ROW FORMAT}} and {{LOCATION}} are the 
effectively same as what you proposed -- but it does not seem to force all 
table sources to be aware of the compositions of connector / converter (i.e., 
{{TableFactory}}, at least at the API level.

Thoughts?

> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2017-12-18 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295353#comment-16295353
 ] 

Timo Walther commented on FLINK-8240:
-

Hi everyone,

I think we don't need a design document for it but it would be great to hear 
some opinions. I introduced descriptors that allow to describe connectors, 
encoding, and time attributes. 

My current API design looks like:

{code}
tableEnv
  .from(
FileSystem()
  .path("/path/to/csv"))
  .withEncoding(
CSV()
  .field("myfield", Types.STRING)
  .field("myfield2", Types.INT)
  .quoteCharacter(';')
  .fieldDelimiter("#")
  .lineDelimiter("\r\n")
  .commentPrefix("%%")
  .ignoreFirstLine()
  .ignoreParseErrors())
  .withRowtime(
Rowtime()
  .onField("rowtime")
  .withTimestampFromDataStream()
  .withWatermarkFromDataStream())
  .withProctime(
Proctime()
  .onField("myproctime"))
  .toTableSource()
{code}

These descriptors are converted into pure key-value properties. Such as:

{code}
"connector.filesystem.path" -> "/myfile"
"encoding.csv.fields.0.name" -> "field1",
"encoding.csv.fields.0.type" -> "STRING",
"encoding.csv.fields.1.name" -> "field2",
"encoding.csv.fields.1.type" -> "TIMESTAMP",
"encoding.csv.fields.2.name" -> "field3",
"encoding.csv.fields.2.type" -> "ANY(java.lang.Class)",
"encoding.csv.fields.3.name" -> "field4",
"encoding.csv.fields.3.type" -> "ROW(test INT, row VARCHAR)",
"encoding.csv.line-delimiter" -> "^"
{code}

The properties are fully expressed as strings. This allows to save them also in 
configuration files. Which might be interesting for FLINK-7594.

The question is how do we want to translate the properties into actual table 
sources. Or more precisely: How do we want to supply converters? Should they be 
part of the {{TableSource}} interface? Or should table sources be annotated 
with some factory class? Right now we have a similar functionality for external 
catalogs but this is too specific and does not consider encodings or time 
attributes. Furthermore, it would be better to use Java {{ServiceLoader}}s 
instead of classpath scanning. This is also used for Flink's file systems.

So my idea would be to have a class {{TableFactory}} that declares a connector 
e.g. "kafka_0.10" and supported encodings "csv", "avro" (similar to 
FLINK-7643). All built-in table sources need to provide such a factory.

What do you think? [~fhueske] [~jark] [~wheat9] [~ykt836]


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)