[jira] [Created] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
Timo Walther created FLINK-8240: --- Summary: Create unified interfaces to configure and instatiate TableSources Key: FLINK-8240 URL: https://issues.apache.org/jira/browse/FLINK-8240 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther At the moment every table source has different ways for configuration and instantiation. Some table source are tailored to a specific encoding (e.g., {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement a builder or support table source converters for external catalogs. The table sources should have a unified interface for discovery, defining common properties, and instantiation. The {{TableSourceConverters}} provide a similar functionality but use a external catalog. We might generialize this interface. In general a table source declaration depends on the following parts: - Source - Type (e.g. Kafka, Custom) - Properties (e.g. topic, connection info) - Encoding - Type (e.g. Avro, JSON, CSV) - Schema (e.g. Avro class, JSON field names/types) - Rowtime descriptor/Proctime - Watermark strategy and Watermark properties - Time attribute info - Bucketization This issue needs a design document before implementation. Any discussion is very welcome. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8236) Allow to set the parallelism of table queries
Timo Walther created FLINK-8236: --- Summary: Allow to set the parallelism of table queries Key: FLINK-8236 URL: https://issues.apache.org/jira/browse/FLINK-8236 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Right now the parallelism of a table program is determined by the parallelism of the stream/batch environment. E.g., by default, tumbling window operators use the default parallelism of the environment. Simple project and select operations have the same parallelism as the inputs they are applied on. While we cannot change forwarding operations because this would change the results when using retractions, it should be possible to change the parallelism for operators after shuffling operations. It should be possible to specify the default parallelism of a table program in the {{TableConfig}} and/or {{QueryConfig}}. The configuration per query has higher precedence that the configuration per table environment. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8187) Web client does not print errors
Timo Walther created FLINK-8187: --- Summary: Web client does not print errors Key: FLINK-8187 URL: https://issues.apache.org/jira/browse/FLINK-8187 Project: Flink Issue Type: Bug Components: Webfrontend Reporter: Timo Walther When submitting a jar with no defined Main class, the web client does not respond anymore and instead of printing the REST error: {code} java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not run the jar. ... 9 more Caused by: org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file. at org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:592) at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:188) at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:147) at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:72) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69) ... 8 more {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8185) Make first level flattening optional for input and output
Timo Walther created FLINK-8185: --- Summary: Make first level flattening optional for input and output Key: FLINK-8185 URL: https://issues.apache.org/jira/browse/FLINK-8185 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther When converting a {{DataSet}}/{{DataStream}} into a table. A composite type is automatically flattened to a row of fields, e.g. {{MyPojo}} becomes {{Row }}. There is no possibility to keep {{Row >}}. This would be especially interesting for POJOs that should not be flattened and be converted back to the same type: {{toTable[MyPojo]}}. At the moment a user has to specify all fields in order to get the same behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8183) Add native Avro type support to the Table API & SQL
Timo Walther created FLINK-8183: --- Summary: Add native Avro type support to the Table API & SQL Key: FLINK-8183 URL: https://issues.apache.org/jira/browse/FLINK-8183 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Avro types can pass the Table API, however, there should be a more native support in order to have the best user experience. This issue is an umbrella issue for tasks that would improve the handling of Avro types: Improvements could be: - Create a Avro type information that is created from an Avro schema that maps to all supported Table API types (full knowledge about key and values of lists, maps, and union types instead of {{GenericType}}) - Convert {{Utf8}} (even in nested Avro types) to string when entering the Table API and convert it back if necessary - Add scalar functions to change certain values (e.g., {{select('avroRecord.set("name", "Bob").set("age", 12))}}). This is in particular useful when a type has a lot of fields. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8169) Document timestamp and timezone behavior for Table API & SQL
Timo Walther created FLINK-8169: --- Summary: Document timestamp and timezone behavior for Table API & SQL Key: FLINK-8169 URL: https://issues.apache.org/jira/browse/FLINK-8169 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther {{java.sql.Date}}, {{java.sql.Time}}, and {{java.sql.Timestamp}} are timezone dependent. However, in Flink's Table & SQL API we remove this dependency and work with UTC timestamps internally. It should be documented how windows use the timestamps in different timezones for event-time. The behavior for processing-time should be explained as well. What is the result if I cast both timestamps to Long, String, or output them to a DataSet/DataStream? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8168) Add offset support for group windows
Timo Walther created FLINK-8168: --- Summary: Add offset support for group windows Key: FLINK-8168 URL: https://issues.apache.org/jira/browse/FLINK-8168 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther For supporting different timezones, it might be necessary to offset windows by some hours (e.g., -8). The DataStream API allows to define [offsets|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows]. Flink SQLs group windows should also support such a behavior. It might be necessary to introduce the semantics to Apache Calcite first before we can implement it in Flink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
Timo Walther created FLINK-8139: --- Summary: Check for proper equals() and hashCode() when registering a table Key: FLINK-8139 URL: https://issues.apache.org/jira/browse/FLINK-8139 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther In the current Table API & SQL implementation we compare {{Row}}s at different positions. E.g., for joining we test rows for equality or put them into state. A heap state backend requires proper hashCode() and equals() in order to work correct. Thus, every type in the Table API needs to have these methods implemented. We need to check if all fields of a row have implement methods that differ from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both coming from TableSource and DataStream/DataSet. Additionally, for array types, the {{Row}} class should use {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8129) Add a union type
Timo Walther created FLINK-8129: --- Summary: Add a union type Key: FLINK-8129 URL: https://issues.apache.org/jira/browse/FLINK-8129 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Many input formats like Avro or ORC support union types to work with different types in one field. It might make sense to support those use cases in the Table & SQL API as well. A union type is similar to an {{Either}} type but takes an arbitrary number of possible types. It might considers {{null}} as a type. It is up for discussion if we want to add this type to {{flink-core}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8122) Name all table sinks and sources
Timo Walther created FLINK-8122: --- Summary: Name all table sinks and sources Key: FLINK-8122 URL: https://issues.apache.org/jira/browse/FLINK-8122 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Not all table sink and sources have proper names. Therefore, they are displayed as "Unnamed" in the logs and Web UI (e.g. CsvTableSink). We should add names for all built-in connectors. Having information about the table sink name (via {{INSERT INTO}}) would be even better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
Timo Walther created FLINK-8118: --- Summary: Allow to specify the offsets of KafkaTableSources Key: FLINK-8118 URL: https://issues.apache.org/jira/browse/FLINK-8118 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Right now the Kafka TableSources can only read from the current group offset. We should expose the possibilities of the Kafka Consumer: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8069) Support empty watermark strategy for TableSources
Timo Walther created FLINK-8069: --- Summary: Support empty watermark strategy for TableSources Key: FLINK-8069 URL: https://issues.apache.org/jira/browse/FLINK-8069 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther In case the underlying data stream source emits watermarks, it should be possible to define an empty watermark strategy for rowtime attributes in the {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7859) Allow field annotations to define TypeInformation
Timo Walther created FLINK-7859: --- Summary: Allow field annotations to define TypeInformation Key: FLINK-7859 URL: https://issues.apache.org/jira/browse/FLINK-7859 Project: Flink Issue Type: Improvement Components: Type Serialization System Reporter: Timo Walther Assignee: Timo Walther Right now it is impossible to define custom TypeInformation for specific fields. For example, we cannot provide information about a {{Row}} type that is a field of Scala case class. An option would be to allow fields to be annotated with {{@TypeInfo}} such that a type information can be provided. {code} case class Test(a: Int, b: String, @TypeInfo(classOf[MyRowFactory]) c: Row) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7848) Allow time attributes for atomic types
Timo Walther created FLINK-7848: --- Summary: Allow time attributes for atomic types Key: FLINK-7848 URL: https://issues.apache.org/jira/browse/FLINK-7848 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther It is not possible to use processing time and atomic types like: {code} tEnv .fromDataStream(env.fromElements(1, 2, 3), "test, p.proctime") .window(Tumble.over("4.seconds").on("p").as("w")) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7698) Join with null literals leads to NPE
Timo Walther created FLINK-7698: --- Summary: Join with null literals leads to NPE Key: FLINK-7698 URL: https://issues.apache.org/jira/browse/FLINK-7698 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther The following query fails: {code} @Test def testProcessTimeInnerJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) env.setStateBackend(getStateBackend) StreamITCase.clear env.setParallelism(1) val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and t1.nullField = t2.nullField and " + "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second" val data1 = new mutable.MutableList[(Int, Long, String)] data1.+=((1, 1L, "Hi1")) data1.+=((1, 2L, "Hi2")) data1.+=((1, 5L, "Hi3")) data1.+=((2, 7L, "Hi5")) data1.+=((1, 9L, "Hi6")) data1.+=((1, 8L, "Hi8")) data1.+=((1, 8L, "Hi8")) val data2 = new mutable.MutableList[(Int, Long, String)] data2.+=((1, 1L, "HiHi")) data2.+=((2, 2L, "HeHe")) val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField) val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) .select('a, 'b, 'c, 'proctime, 12L as 'nullField) tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } {code} It leads to: {code} java.lang.NullPointerException at org.apache.calcite.rex.RexUtil.gatherConstraint(RexUtil.java:437) at org.apache.calcite.rex.RexUtil.gatherConstraints(RexUtil.java:399) at org.apache.calcite.rex.RexUtil.predicateConstants(RexUtil.java:336) at org.apache.calcite.plan.RelOptPredicateList.of(RelOptPredicateList.java:144) at org.apache.calcite.rel.metadata.RelMdPredicates$JoinConditionBasedPredicateInference.inferPredicates(RelMdPredicates.java:654) at org.apache.calcite.rel.metadata.RelMdPredicates.getPredicates(RelMdPredicates.java:326) at GeneratedMetadataHandler_Predicates.getPredicates_$(Unknown Source) at GeneratedMetadataHandler_Predicates.getPredicates(Unknown Source) at GeneratedMetadataHandler_Predicates.getPredicates_$(Unknown Source) at GeneratedMetadataHandler_Predicates.getPredicates(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:803) at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:264) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:125) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197) at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:663) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:728) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195) at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121) at org.apache.flink.table.runtime.stream.sql.JoinITCase.testProcessTimeInnerJoin(JoinITCase.scala:67) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at
[jira] [Created] (FLINK-7691) Remove ClassTag in Scala DataSet API
Timo Walther created FLINK-7691: --- Summary: Remove ClassTag in Scala DataSet API Key: FLINK-7691 URL: https://issues.apache.org/jira/browse/FLINK-7691 Project: Flink Issue Type: Improvement Components: DataSet API Reporter: Timo Walther In the DataStream API a {{ClassTag}} is not required, which allows to pass {{TypeInformation}} manually if required. In the DataSet API most methods look like: {code} // DataSet API def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T] // DataStream API def fromElements[T: TypeInformation](data: T*): DataStream[T] {code} I would propose to remove the ClassTag, if possible. This would make it easier e.g. to supply TypeInformation for the {{Row}} type. Or is there an easier way in Scala that I don't know? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7681) Support arbitrary TypeInformation in Table API
Timo Walther created FLINK-7681: --- Summary: Support arbitrary TypeInformation in Table API Key: FLINK-7681 URL: https://issues.apache.org/jira/browse/FLINK-7681 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Right now, we do support most Java basic types as well as composite types and generic types. However, we should also support any custom {{TypeInformation}} (such as {{BasicTypeInfo.DATE}} or {{BasicTypeInfo.VOID}}). They should be handled as a black box similar to generic types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7594) Add a SQL CLI client
Timo Walther created FLINK-7594: --- Summary: Add a SQL CLI client Key: FLINK-7594 URL: https://issues.apache.org/jira/browse/FLINK-7594 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther At the moment a user can only specify queries within a Java/Scala program which is nice for integrating table programs or parts of it with DataSet or DataStream API. With more connectors coming up, it is time to also provide a programming-free SQL client. The SQL client should consist of a CLI interface and maybe also a REST API. The concrete design is still up for discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7554) Add a testing RuntimeContext to test utilities
Timo Walther created FLINK-7554: --- Summary: Add a testing RuntimeContext to test utilities Key: FLINK-7554 URL: https://issues.apache.org/jira/browse/FLINK-7554 Project: Flink Issue Type: New Feature Components: Tests Reporter: Timo Walther When unit testing user-defined functions it would be useful to have an official testing {{RuntimeContext}} that uses Java collections for storing state, metrics, etc. After executing the business logic, the user could then verify how the state of the UDF changed or which metrics have been collected. This issue includes documentation for the "Testing" section. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7452) Add helper methods for all built-in Flink types to Types
Timo Walther created FLINK-7452: --- Summary: Add helper methods for all built-in Flink types to Types Key: FLINK-7452 URL: https://issues.apache.org/jira/browse/FLINK-7452 Project: Flink Issue Type: Improvement Components: Type Serialization System Reporter: Timo Walther Assignee: Timo Walther Sometimes it is very difficult to provide `TypeInformation` manually, in case some extraction fails or is not available. {{TypeHint}}s should be the preferred way but this methods can ensure correct types. I propose to add all built-in Flink types to the {{Types}}. Such as: {code} Types.POJO(MyPojo.class) Types.POJO(Map) Types.GENERIC(Object.class) Types.TUPLE(TypeInformation, ...) Types.MAP(TypeInformation, TypeInformation) {code} The methods should validate that the returned type is exactly the requested type. And especially in case of POJO should help creating {{PojoTypeInfo}}. Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7426) Table API does not support null values in keys
Timo Walther created FLINK-7426: --- Summary: Table API does not support null values in keys Key: FLINK-7426 URL: https://issues.apache.org/jira/browse/FLINK-7426 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther The Table API uses {{keyBy}} internally, however, the generated {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not able to serialize null values. This causes issues during checkpointing or when using the RocksDB state backend. We need to replace all {{keyBy}} calls with a custom {{RowKeySelector}}. {code} class AggregateITCase extends StreamingWithStateTestBase { private val queryConfig = new StreamQueryConfig() queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) @Test def testDistinct(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.clear val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) .select('b, Null(Types.LONG)).distinct() val results = t.toRetractStream[Row](queryConfig) results.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null", "6,null") assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7421) AvroRow(De)serializationSchema not serializable
Timo Walther created FLINK-7421: --- Summary: AvroRow(De)serializationSchema not serializable Key: FLINK-7421 URL: https://issues.apache.org/jira/browse/FLINK-7421 Project: Flink Issue Type: Bug Components: Streaming Connectors, Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Both {{AvroRowDeserializationSchema}} and {{AvroRowSerializationSchema}} contain fields that are not serializable. Those fields should be made transient and both schemas need to be tested in practice. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7403) Remove expandLocalRef() before code generation
Timo Walther created FLINK-7403: --- Summary: Remove expandLocalRef() before code generation Key: FLINK-7403 URL: https://issues.apache.org/jira/browse/FLINK-7403 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Currently, we expand local references before code generation. This means that expressions that actually only need to be evaluated once, might be evaluated multiple times. We should remove the expand step where possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7389) Remove Calcite PushProjector
Timo Walther created FLINK-7389: --- Summary: Remove Calcite PushProjector Key: FLINK-7389 URL: https://issues.apache.org/jira/browse/FLINK-7389 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther The PushProjector class is copied from Apache Calcite except that it does not automatically name the field using the name of the operators as the Table API rejects special characters like '-' in the field names. We need to find a solution without copying Calcite classes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7380) Limit usage of Row type
Timo Walther created FLINK-7380: --- Summary: Limit usage of Row type Key: FLINK-7380 URL: https://issues.apache.org/jira/browse/FLINK-7380 Project: Flink Issue Type: Improvement Components: DataSet API, DataStream API Reporter: Timo Walther Assignee: Timo Walther The recently introduced {{Row}} type causes a lot of confusion for users. By default they are serialized using Kryo. We should not allow to use {{GenericTypeInfo}}. The TypeExtractor should throw an exception and encourage to provide proper field types. The Row class should be final and only targeted for intended use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7360) Support Scala map type
Timo Walther created FLINK-7360: --- Summary: Support Scala map type Key: FLINK-7360 URL: https://issues.apache.org/jira/browse/FLINK-7360 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Currently, Flink SQL supports only Java `java.util.Map`. Scala maps are treated as a blackbox with Flink `GenericTypeInfo`/SQL `ANY` data type. Therefore, you can forward these blackboxes and use them within scalar functions but accessing with the `['key']` operator is not supported. We should convert these special collections at the beginning, in order to use in a SQL statement. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7301) Rework state documentation
Timo Walther created FLINK-7301: --- Summary: Rework state documentation Key: FLINK-7301 URL: https://issues.apache.org/jira/browse/FLINK-7301 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Timo Walther Assignee: Timo Walther The documentation about state is spread across different pages, but this is not consistent and it is hard to find what you need. I propose: "Mention State Backends and link to them in ""Streaming/Working with State"". Create category ""State & Fault Tolerance"" under ""Streaming"". Move ""Working with State"", ""Checkpointing"" and ""Queryable State"". Move API related parts (90%) of ""Deployment/State & Fault Tolerance/State Backends"" to ""Streaming/State & Fault Tolerance/State Backends"". Move all tuning things from ""Debugging/Large State"" to ""Deployment/State & Fault Tolerance/State Backends"". Move ""Streaming/Working with State/Custom Serialization for Managed State"" to ""Streaming/State & Fault Tolerance/Custom Serialization"" (Add a link from previous position, also link from ""Data Types & Serialization"")." -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7272) Support SQL IN with more 20 elements in streaming
Timo Walther created FLINK-7272: --- Summary: Support SQL IN with more 20 elements in streaming Key: FLINK-7272 URL: https://issues.apache.org/jira/browse/FLINK-7272 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther During the implementation of FLINK-4565 I noticed that an "IN" operation with more than 20 elements is converted into {{LogicalValues}} + {{LogicalJoin}} operation. Since this is not possible in streaming, we should add some rule that merges both operators either into a Correlate or Calc. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7271) ExpressionReducer does not optimize string-to-time conversion
Timo Walther created FLINK-7271: --- Summary: ExpressionReducer does not optimize string-to-time conversion Key: FLINK-7271 URL: https://issues.apache.org/jira/browse/FLINK-7271 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Expressions like {{"1996-11-10".toDate}} or {{"1996-11-10 12:12:12".toTimestamp}} are not recognized by the ExpressionReducer and are evaluated during runtime instead of pre-flight phase. In order to optimize the runtime we should allow constant expression reduction here. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7171) Remove identity project for time attributes
Timo Walther created FLINK-7171: --- Summary: Remove identity project for time attributes Key: FLINK-7171 URL: https://issues.apache.org/jira/browse/FLINK-7171 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther If only time attributes are projected away, the translated plan should not contain an additional Calc node. Example: {code} streamUtil.addTable[(Int, String, Long)]( "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) {code} and {code} streamUtil.addTable[(Int, String, Long)]( "MyTable", 'a, 'b, 'c) {code} Lead to different logical plans even if these attributes are not accessed in {{"SELECT DISTINCT a, b, c FROM MyTable"}}. {code} unaryNode( "DataStreamGroupAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), term("select", "a, b, c") ), term("groupBy", "a, b, c"), term("select", "a, b, c") ) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7005) Optimization steps are missing for nested registered tables
Timo Walther created FLINK-7005: --- Summary: Optimization steps are missing for nested registered tables Key: FLINK-7005 URL: https://issues.apache.org/jira/browse/FLINK-7005 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Tables that are registered (implicitly or explicitly) do not pass the first three optimization steps: - decorrelate - convert time indicators - normalize the logical plan E.g. this has the wrong plan right now: {code} val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val table1 = tEnv.sql(s"""SELECT 1 + 1 FROM $table""") // not optimized val table2 = tEnv.sql(s"""SELECT myrt FROM $table1""") val results = table2.toAppendStream[Row] {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6995) Add a warning to outdated documentation
Timo Walther created FLINK-6995: --- Summary: Add a warning to outdated documentation Key: FLINK-6995 URL: https://issues.apache.org/jira/browse/FLINK-6995 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Timo Walther When I search for "flink yarn", the first result is a outdated 0.8 release documentation page. We should add a warning to outdated documentation pages. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6994) Wrong base url in master docs
Timo Walther created FLINK-6994: --- Summary: Wrong base url in master docs Key: FLINK-6994 URL: https://issues.apache.org/jira/browse/FLINK-6994 Project: Flink Issue Type: Bug Components: Documentation Reporter: Timo Walther Assignee: Timo Walther The base url of the master docs point to 1.3 instead of 1.4. At the moment the menu items point to the latest stable release docs instead of the nightly master docs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6987) Flink does not build if path contains spaces
Timo Walther created FLINK-6987: --- Summary: Flink does not build if path contains spaces Key: FLINK-6987 URL: https://issues.apache.org/jira/browse/FLINK-6987 Project: Flink Issue Type: Bug Components: Build System Reporter: Timo Walther The test {{TextInputFormatTest.testNestedFileRead}} fails if the path contains spaces. Reason: "Test erroneous" I was building Flink on MacOS 10.12.5 and the folder was called "flink-1.3.1 2". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6881) Creating a table from a POJO and defining a time attribute fails
Timo Walther created FLINK-6881: --- Summary: Creating a table from a POJO and defining a time attribute fails Key: FLINK-6881 URL: https://issues.apache.org/jira/browse/FLINK-6881 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Creating a table from a DataStream of POJOs fails when the user tries to define a rowtime attribute. There are multiple reasons in {{ExpressionParser}} as well as {{StreamTableEnvironment#validateAndExtractTimeAttributes}}. See also: https://stackoverflow.com/questions/8022/apache-flink-1-3-table-api-rowtime-strange-behavior -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6870) Combined batch and stream TableSource can not produce same time attributes
Timo Walther created FLINK-6870: --- Summary: Combined batch and stream TableSource can not produce same time attributes Key: FLINK-6870 URL: https://issues.apache.org/jira/browse/FLINK-6870 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther If a class implements both {{BatchTableSource}} and {{StreamTableSource}}, it is not possible to declare a time attribute which is valid for both environments. For batch it should be a regular field, but not for streaming. The {{getReturnType}} method does not know the environment in which it is called. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6705) Check visibility of methods in Table API classes
Timo Walther created FLINK-6705: --- Summary: Check visibility of methods in Table API classes Key: FLINK-6705 URL: https://issues.apache.org/jira/browse/FLINK-6705 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther If a user is working with the Java version of the Table API, there are many methods that should not be exposed. We should try to improve this. e.g. TableEnvironment has visible methods like {{getFieldIndices}} or {{validateType}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6700) Remove "Download and Compile" from quickstart
Timo Walther created FLINK-6700: --- Summary: Remove "Download and Compile" from quickstart Key: FLINK-6700 URL: https://issues.apache.org/jira/browse/FLINK-6700 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Timo Walther A user does not has to clone and compile the entire Flink repository as one of the first steps in the quickstart. Downloading a release is sufficient. See here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6697) Add batch multi-window support
Timo Walther created FLINK-6697: --- Summary: Add batch multi-window support Key: FLINK-6697 URL: https://issues.apache.org/jira/browse/FLINK-6697 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Multiple consecutive windows on batch are not tested yet and I think they are also not supported yet, because the syntax is not defined for batch yet. The following should be supported: {code} val t = table .window(Tumble over 2.millis on 'rowtime as 'w) .groupBy('w) .select('w.rowtime as 'rowtime, 'int.count as 'int) .window(Tumble over 4.millis on 'rowtime as 'w2) .groupBy('w2) .select('w2.rowtime, 'w2.end, 'int.count) {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6672) Support CAST(timestamp AS BIGINT)
Timo Walther created FLINK-6672: --- Summary: Support CAST(timestamp AS BIGINT) Key: FLINK-6672 URL: https://issues.apache.org/jira/browse/FLINK-6672 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther It is not possible to cast a TIMESTAMP, TIME, or DATE to BIGINT, INT, INT in SQL. The Table API and the code generation support this, but the SQL validation seems to prohibit it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6655) Misleading error message when HistoryServer path is empty
Timo Walther created FLINK-6655: --- Summary: Misleading error message when HistoryServer path is empty Key: FLINK-6655 URL: https://issues.apache.org/jira/browse/FLINK-6655 Project: Flink Issue Type: Bug Components: History Server Reporter: Timo Walther Priority: Minor If the HistoryServer {{jobmanager.archive.fs.dir}} if e.g. {{file://}}. The following exception mentions checkpoints, which is misleading. {code} java.lang.IllegalArgumentException: Cannot use the root directory for checkpoints. at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:358) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6602) Table source with defined time attributes allows empty string
Timo Walther created FLINK-6602: --- Summary: Table source with defined time attributes allows empty string Key: FLINK-6602 URL: https://issues.apache.org/jira/browse/FLINK-6602 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther {{DefinedRowtimeAttribute}} and {{DefinedProctimeAttribute}} are not checked for empty strings. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6601) Use time indicators in DataStreamLogicalWindowAggregateRule
Timo Walther created FLINK-6601: --- Summary: Use time indicators in DataStreamLogicalWindowAggregateRule Key: FLINK-6601 URL: https://issues.apache.org/jira/browse/FLINK-6601 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther The DataStreamLogicalWindowAggregateRule can use time indicators to not generate a dummy timestamp in the DataStreamCalc and make the plan representation nicer. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6591) Extend functionality of final ConversionMapper
Timo Walther created FLINK-6591: --- Summary: Extend functionality of final ConversionMapper Key: FLINK-6591 URL: https://issues.apache.org/jira/browse/FLINK-6591 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther The functionality of the ConversionMapper generated in {{TableEnvironment#generateRowConverterFunction}} is very limited right now: - It does not support conversion of nested Row types, e.g. a nested Avro-record can be read with the KafkaAvroTableSource into a nested row structure, but this structure can not be converted back into a Pojo or Avro structure. The code generator needs to be extended for this. - The Table API supports BasicTypeInfo (e.g. {{Integer[]}}) as an input field, but since it works with ObjectTypeInfo internally, it cannot output the array. I disabled the test {{TableEnvironmentITCase#testAsFromTupleToPojo}} for now. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6587) Java Table API cannot parse function names starting with keywords
Timo Walther created FLINK-6587: --- Summary: Java Table API cannot parse function names starting with keywords Key: FLINK-6587 URL: https://issues.apache.org/jira/browse/FLINK-6587 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther The ExpressionParser of the Java Table API has problems with functions that start with a reserved keyword. e.g. a function must not be called {{summing}} because {{sum}} is reserved. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6585) Table examples are not runnable in IDE
Timo Walther created FLINK-6585: --- Summary: Table examples are not runnable in IDE Key: FLINK-6585 URL: https://issues.apache.org/jira/browse/FLINK-6585 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Running Table API examples in {{flink-examples-table}} fails with: {code} Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.TableEnvironment {code} Seems to be a Maven issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6584) Support multiple consecutive windows in SQL
Timo Walther created FLINK-6584: --- Summary: Support multiple consecutive windows in SQL Key: FLINK-6584 URL: https://issues.apache.org/jira/browse/FLINK-6584 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Right now, the Table API supports multiple consecutive windows as follows: {code} val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val t = table .window(Tumble over 2.millis on 'rowtime as 'w) .groupBy('w) .select('w.rowtime as 'rowtime, 'int.count as 'int) .window(Tumble over 4.millis on 'rowtime as 'w2) .groupBy('w2) .select('w2.rowtime, 'w2.end, 'int.count) {code} Similar behavior should be supported by the SQL API as well. We need to introduce a new auxiliary group function, but this should happen in sync with Apache Calcite. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6579) Add proper support for BasicArrayTypeInfo
Timo Walther created FLINK-6579: --- Summary: Add proper support for BasicArrayTypeInfo Key: FLINK-6579 URL: https://issues.apache.org/jira/browse/FLINK-6579 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther FLINK-6033 added only partial support for arrays of Java wrapper classes (e.g. Integer[]). In most cases operations fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6547) Improve toString methods of Table API expressions
Timo Walther created FLINK-6547: --- Summary: Improve toString methods of Table API expressions Key: FLINK-6547 URL: https://issues.apache.org/jira/browse/FLINK-6547 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Right now, we logically test windows like this: {code} term( "window", TumblingGroupWindow( WindowReference("w1"), 'proctime, 50.milli)), {code} But this does not test if the proctime attribute is resolved nor checks the time indicator type. If we still rely on string comparison for tests, we need to make sure that all expressions have a rich {{toString}} function with sufficient information. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6543) Deprecate toDataStream
Timo Walther created FLINK-6543: --- Summary: Deprecate toDataStream Key: FLINK-6543 URL: https://issues.apache.org/jira/browse/FLINK-6543 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther With retraction support, we should deprecate {{toDataStream}} and introduce a new {{toAppendStream}} to clearly differentiate between retraction and non-retraction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6542) Non-keyed, non-windowed aggregation fails
Timo Walther created FLINK-6542: --- Summary: Non-keyed, non-windowed aggregation fails Key: FLINK-6542 URL: https://issues.apache.org/jira/browse/FLINK-6542 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther The following query produces an empty result: {code} val data = List( (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"), (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"), (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"), (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"), (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"), (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world")) @Test def testMin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) tEnv.registerTable("MyTable", table) val t = tEnv.sql("SELECT MIN(`int`) FROM MyTable") t.toRetractStream[Row].print() } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6517) Support multiple consecutive windows
Timo Walther created FLINK-6517: --- Summary: Support multiple consecutive windows Key: FLINK-6517 URL: https://issues.apache.org/jira/browse/FLINK-6517 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther FLINK-5884 changed the way how windows can be defined, however, it is not possible to define multiple consecutive windows right now. It should be possible to refine the end property of a window as a new time attribute. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6483) Support time materialization
Timo Walther created FLINK-6483: --- Summary: Support time materialization Key: FLINK-6483 URL: https://issues.apache.org/jira/browse/FLINK-6483 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther FLINK-5884 added support for time indicators. However, there are still some features missing i.e. materialization of metadata timestamp. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6430) Remove Calcite classes for time resolution of auxiliary group functions
Timo Walther created FLINK-6430: --- Summary: Remove Calcite classes for time resolution of auxiliary group functions Key: FLINK-6430 URL: https://issues.apache.org/jira/browse/FLINK-6430 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther TUMBLE/HOP/SESSION_START/END did not resolve time field correctly. FLINK-6409 copied some classes from Calcite that are not necessary in Calcite 1.13 anymore. We can remove them again. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6429) Bump up Calcite version to 1.13
Timo Walther created FLINK-6429: --- Summary: Bump up Calcite version to 1.13 Key: FLINK-6429 URL: https://issues.apache.org/jira/browse/FLINK-6429 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther This is an umbrella issue for all tasks that need to be done once Apache Calcite 1.13 is released. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6409) TUMBLE/HOP/SESSION_START/END do not resolve time field correctly
Timo Walther created FLINK-6409: --- Summary: TUMBLE/HOP/SESSION_START/END do not resolve time field correctly Key: FLINK-6409 URL: https://issues.apache.org/jira/browse/FLINK-6409 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Calcite has a bug and cannot resolve the time fields of auxiliary group functions correctly. A discussion can be found in CALCITE-1761. Right now this issue only affects our batch SQL API, but it is a blocker for FLINK-5884. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6214) WindowAssigners do not allow negative offsets
Timo Walther created FLINK-6214: --- Summary: WindowAssigners do not allow negative offsets Key: FLINK-6214 URL: https://issues.apache.org/jira/browse/FLINK-6214 Project: Flink Issue Type: Bug Components: Streaming Reporter: Timo Walther Both the website and the JavaDoc promotes ".window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) For example, in China you would have to specify an offset of Time.hours(-8)". But both the sliding and tumbling event time assigners do not allow offset to be negative. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6168) Make flink-core independent of Avro
Timo Walther created FLINK-6168: --- Summary: Make flink-core independent of Avro Key: FLINK-6168 URL: https://issues.apache.org/jira/browse/FLINK-6168 Project: Flink Issue Type: Sub-task Components: Core Reporter: Timo Walther Right now, flink-core has Avro dependencies. We should move AvroTypeInfo to flink-avro and make the TypeExtractor Avro independent (e.g. reflection-based similar to Hadoop Writables or with an other approach). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5994) Add Janino to flink-table JAR file
Timo Walther created FLINK-5994: --- Summary: Add Janino to flink-table JAR file Key: FLINK-5994 URL: https://issues.apache.org/jira/browse/FLINK-5994 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther It seems that Janino is not part of the flink-table JAR file although it is a dependency in pom.xml. Users adding flink-table to Flink's lib folder because of FLINK-5227 cannot run table program due to the missing Janino dependency. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5986) Add forwarded field annotations to batch (window) aggregate operators
Timo Walther created FLINK-5986: --- Summary: Add forwarded field annotations to batch (window) aggregate operators Key: FLINK-5986 URL: https://issues.apache.org/jira/browse/FLINK-5986 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther We should add annotations to all (window) aggregate operators. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5983) Replace for/foreach/map in aggregates by while loops
Timo Walther created FLINK-5983: --- Summary: Replace for/foreach/map in aggregates by while loops Key: FLINK-5983 URL: https://issues.apache.org/jira/browse/FLINK-5983 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Right now there is a mixture of different kinds of loops within aggregate functions. Although performance is not the main goal at the moment, we should focus on performant execution especially in this runtime functions. e.g. {{DataSetTumbleCountWindowAggReduceGroupFunction}} We should replace loops, maps etc. by primitive while loops. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5884) Integrate time indicators for Table API & SQL
Timo Walther created FLINK-5884: --- Summary: Integrate time indicators for Table API & SQL Key: FLINK-5884 URL: https://issues.apache.org/jira/browse/FLINK-5884 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther We already discussed the need for a proper integration of time indicators (event-time or processing-time) for both the Table API & SQL on the ML: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-SQL-indicators-for-event-and-processing-time-tp15927.html This issue will track the progress. I will work on a design document how we can solve this issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5760) "Introduction to Apache Flink" links to 1.1 release
Timo Walther created FLINK-5760: --- Summary: "Introduction to Apache Flink" links to 1.1 release Key: FLINK-5760 URL: https://issues.apache.org/jira/browse/FLINK-5760 Project: Flink Issue Type: Bug Components: Project Website Reporter: Timo Walther Multiple links in "Introduction to Apache Flink" point to the 1.1 release. They should always point to the current release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5735) Non-overlapping sliding window is not deterministic
Timo Walther created FLINK-5735: --- Summary: Non-overlapping sliding window is not deterministic Key: FLINK-5735 URL: https://issues.apache.org/jira/browse/FLINK-5735 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther I don't know if this is a problem of the Table API or the underlying API. We have to investigate this as part of the issue. The following code leads to different results from time to time. Sometimes the count of "Hello" is 1 sometimes 2. {code} val data = List( (1L, 1, "Hi"), (2L, 2, "Hallo"), (3L, 2, "Hello"), (6L, 3, "Hello"), (4L, 5, "Hello"), (16L, 4, "Hello world"), (8L, 3, "Hello world")) @Test def testEventTimeSlidingWindowNonOverlapping(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table .window(Slide over 5.milli every 10.milli on 'rowtime as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val results = windowedTable.toDataStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() val expected = Seq( "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] { override def checkAndGetNextWatermark( lastElement: (Long, Int, String), extractedTimestamp: Long) : Watermark = { new Watermark(extractedTimestamp) } override def extractTimestamp( element: (Long, Int, String), previousElementTimestamp: Long): Long = { element._1 } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5714) Use a builder pattern for creating CsvTableSource
Timo Walther created FLINK-5714: --- Summary: Use a builder pattern for creating CsvTableSource Key: FLINK-5714 URL: https://issues.apache.org/jira/browse/FLINK-5714 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Right now, the constructor of the CsvTableSource can have up to 9 parameters. In Scala this might not be a problem because of default values, but Java doesn't has this functionality. I propose to have a a builder pattern here: {code} CsvTableSource .builder() .field("myfield", Types.STRING) .field("myfield2", Types.INT) .quoteCharacter(';') .build() {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5678) User-defined TableFunctions do not support all types of parameters
Timo Walther created FLINK-5678: --- Summary: User-defined TableFunctions do not support all types of parameters Key: FLINK-5678 URL: https://issues.apache.org/jira/browse/FLINK-5678 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther It seems that TableFunctions do not support all types of parameters. E.g. {code} XXX.select("1000L AS time").join("mytf(time)"); public static class MyTableFunction extends TableFunction { public void eval(Long ts) { Row r = new Row(2); r.setField(0, ts); collect(r); } } {code} leads to {code} Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: No matching signature found. at org.apache.flink.table.codegen.calls.TableFunctionCallGen$$anonfun$1.apply(TableFunctionCallGen.scala:47) at org.apache.flink.table.codegen.calls.TableFunctionCallGen$$anonfun$1.apply(TableFunctionCallGen.scala:47) at scala.Option.getOrElse(Option.scala:120) at org.apache.flink.table.codegen.calls.TableFunctionCallGen.generate(TableFunctionCallGen.scala:47) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1011) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5677) Validation layer should check whether field can be used as grouping key
Timo Walther created FLINK-5677: --- Summary: Validation layer should check whether field can be used as grouping key Key: FLINK-5677 URL: https://issues.apache.org/jira/browse/FLINK-5677 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther When grouping on a GenericType (e.g. {{Object}}). The validation layer does no checking of types. The {{groupBy/keyBy}} of DataSet/DataStream API throws the exception. The validation layer should check this before. {code} public static class OuterPojo { public Object object = new Object(); } tEnv.sql("SELECT testing.object FROM testing GROUP BY testing.object") {code} leads to {code} Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType) cannot be used as key. at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:270) at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223) at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692) at org.apache.flink.table.plan.nodes.dataset.DataSetAggregate.translateToPlan(DataSetAggregate.scala:134) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:305) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:289) at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5676) Grouping on nested fields does not work
Timo Walther created FLINK-5676: --- Summary: Grouping on nested fields does not work Key: FLINK-5676 URL: https://issues.apache.org/jira/browse/FLINK-5676 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther {code} tEnv .fromDataSet(pojoWithinnerPojo) .groupBy("innerPojo.get('line')") .select("innerPojo.get('line')") {code} fails with {code} ValidationException: Cannot resolve [innerPojo] given input ['innerPojo.get(line)]. {code} I don't know if we want to support that but the exception should be more helpful anyway. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5674) Stdout not present in logs files
Timo Walther created FLINK-5674: --- Summary: Stdout not present in logs files Key: FLINK-5674 URL: https://issues.apache.org/jira/browse/FLINK-5674 Project: Flink Issue Type: Bug Components: Core Reporter: Timo Walther I don't know if I did something wrong. I started Flink 1.2 RC2 with {{start-cluster.sh}} and started another {{taskmanager.sh start}}. I'm printing lines with {{dataset.print()}} and {{System.out.println()}}, but both outputs do not occur in the log files/out files or web interface. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5673) NullPointerException in RemoteInputChannel.requestSubpartition
Timo Walther created FLINK-5673: --- Summary: NullPointerException in RemoteInputChannel.requestSubpartition Key: FLINK-5673 URL: https://issues.apache.org/jira/browse/FLINK-5673 Project: Flink Issue Type: Bug Components: Core Reporter: Timo Walther It might be related to FLINK-5672 but I'm opening an additional issue, because it is a different exception after the second execution. The setup is identical to FLINK-5672. But if I run the job a second time I got the following exception: {code} java.lang.NullPointerException at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:419) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:441) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5672) Job fails with java.lang.IllegalArgumentException: port out of range:-1
Timo Walther created FLINK-5672: --- Summary: Job fails with java.lang.IllegalArgumentException: port out of range:-1 Key: FLINK-5672 URL: https://issues.apache.org/jira/browse/FLINK-5672 Project: Flink Issue Type: Bug Components: Core Reporter: Timo Walther I started the JobManager with {{start-local.sh}} and started another TaskManager with {{taskmanager.sh start}}. My job is a Table API job with a {{orderBy}} (range partitioning with parallelism 2). The job fails with the following exception: {code} java.lang.IllegalArgumentException: port out of range:-1 at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143) at java.net.InetSocketAddress.(InetSocketAddress.java:188) at org.apache.flink.runtime.io.network.ConnectionID.(ConnectionID.java:47) at org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:124) at org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:627) at org.apache.flink.runtime.executiongraph.Execution.deployToSlot(Execution.java:358) at org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:284) at org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:279) at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259) at akka.dispatch.OnComplete.internal(Future.scala:248) at akka.dispatch.OnComplete.internal(Future.scala:245) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.apache.flink.runtime.concurrent.Executors$DirectExecutor.execute(Executors.java:56) at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:122) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$KeptPromise.onComplete(Promise.scala:333) at org.apache.flink.runtime.concurrent.impl.FlinkFuture.handleAsync(FlinkFuture.java:256) at org.apache.flink.runtime.concurrent.impl.FlinkFuture.handle(FlinkFuture.java:270) at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:279) at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:479) at org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:525) at org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:521) at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5662) Alias in front of output fails
Timo Walther created FLINK-5662: --- Summary: Alias in front of output fails Key: FLINK-5662 URL: https://issues.apache.org/jira/browse/FLINK-5662 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Neither of the following snippets works: {code} public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet text = ... BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); // Table t = tEnv.fromDataSet(text, "text").select("text AS line"); // Table t = tEnv.fromDataSet(text, "text").as("line"); Table t = tEnv.fromDataSet(text, "text").select("text AS line").select("line AS line"); tEnv.toDataSet(t, MyPojo.class).print(); } public static class MyPojo { public String line; } {code} {code} Exception in thread "main" org.apache.flink.table.api.TableException: POJO does not define field name: text at org.apache.flink.table.typeutils.TypeConverter$$anonfun$2.apply(TypeConverter.scala:85) at org.apache.flink.table.typeutils.TypeConverter$$anonfun$2.apply(TypeConverter.scala:81) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.flink.table.typeutils.TypeConverter$.determineReturnType(TypeConverter.scala:81) at org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToExpectedType(BatchScan.scala:69) at org.apache.flink.table.plan.nodes.dataset.DataSetScan.translateToPlan(DataSetScan.scala:61) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:305) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:289) at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146) at groupId.WordCount.main(WordCount.java:67) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5589) Tumbling group windows on batch tables do not consider object reuse
Timo Walther created FLINK-5589: --- Summary: Tumbling group windows on batch tables do not consider object reuse Key: FLINK-5589 URL: https://issues.apache.org/jira/browse/FLINK-5589 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther The tumbling group windows on batch tables might not work properly when object reuse mode is enabled. See: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#object-reuse-enabled "Do not remember input objects received from an Iterable." -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5586) Extend TableProgramsTestBase for object reuse modes
Timo Walther created FLINK-5586: --- Summary: Extend TableProgramsTestBase for object reuse modes Key: FLINK-5586 URL: https://issues.apache.org/jira/browse/FLINK-5586 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther We should also test if all runtime operators of the Table API work correctly if object reuse mode is set to true. This should be done for all cluster-based ITCases, not the collection-based ones. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5514) Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS
Timo Walther created FLINK-5514: --- Summary: Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS Key: FLINK-5514 URL: https://issues.apache.org/jira/browse/FLINK-5514 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther A first support for GROUPING SETS has been added in FLINK-5303. However, the current runtime implementation is not very efficient as it basically only translates logical operators to physical operators i.e. grouping sets are currently only translated into multiple groupings that are unioned together. A rough design document for this has been created in FLINK-2980. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5447) Sync documentation of built-in functions for Table API with SQL
Timo Walther created FLINK-5447: --- Summary: Sync documentation of built-in functions for Table API with SQL Key: FLINK-5447 URL: https://issues.apache.org/jira/browse/FLINK-5447 Project: Flink Issue Type: Improvement Components: Documentation, Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Priority: Minor I will split up the documentation for the built-in functions similar to the SQL structure. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5441) Directly allow SQL queries on a Table
Timo Walther created FLINK-5441: --- Summary: Directly allow SQL queries on a Table Key: FLINK-5441 URL: https://issues.apache.org/jira/browse/FLINK-5441 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Right now a user has to register a table before it can be used in SQL queries. In order to allow more fluent programming we propose calling SQL directly on a table. An underscore can be used to reference the current table: {code} myTable.sql("SELECT a, b, c FROM _ WHERE d = 12") {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5429) Code generate types between operators in Table API
Timo Walther created FLINK-5429: --- Summary: Code generate types between operators in Table API Key: FLINK-5429 URL: https://issues.apache.org/jira/browse/FLINK-5429 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Currently, the Table API uses the generic Row type for shipping records between operators in underlying DataSet and DataStream API. For efficiency reasons we should code generate those records. The final design is up for discussion but here are some ideas: A row like {{(a: INT NULL, b: INT NOT NULL, c: STRING)}} could look like {code} final class GeneratedRow$123 { public boolean a_isNull; public int a; public int b; public String c; } {code} Types could be generated using Janino in the pre-flight phase. The generated types should use primitive types wherever possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5418) Estimated row size does not support nested types
Timo Walther created FLINK-5418: --- Summary: Estimated row size does not support nested types Key: FLINK-5418 URL: https://issues.apache.org/jira/browse/FLINK-5418 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Operations that use {{org.apache.flink.table.plan.nodes.FlinkRel#estimateRowSize}} do not support nested types yet and fail with: {code} java.lang.AssertionError: Internal error: Error occurred while applying rule DataSetMinusRule at org.apache.calcite.util.Util.newInternal(Util.java:792) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:256) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:288) at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140) at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40) at org.apache.flink.table.api.scala.batch.table.SetOperatorsITCase.testMinus(SetOperatorsITCase.scala:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: org.apache.flink.table.api.TableException: Unsupported data
[jira] [Created] (FLINK-5414) Bump up Calcite version to 1.11
Timo Walther created FLINK-5414: --- Summary: Bump up Calcite version to 1.11 Key: FLINK-5414 URL: https://issues.apache.org/jira/browse/FLINK-5414 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther The upcoming Calcite release 1.11 has a lot of stability fixes and new features. We should update it for the Table API. E.g. we can hopefully merge FLINK-4864 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5413) Convert TableEnvironmentITCases to unit tests
Timo Walther created FLINK-5413: --- Summary: Convert TableEnvironmentITCases to unit tests Key: FLINK-5413 URL: https://issues.apache.org/jira/browse/FLINK-5413 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther The following IT cases could be converted into unit tests: - {{org.apache.flink.table.api.scala.batch.TableEnvironmentITCase}} - {{org.apache.flink.table.api.java.batch.TableEnvironmentITCase}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5354) Split up Table API documentation into multiple pages
Timo Walther created FLINK-5354: --- Summary: Split up Table API documentation into multiple pages Key: FLINK-5354 URL: https://issues.apache.org/jira/browse/FLINK-5354 Project: Flink Issue Type: Improvement Components: Documentation, Table API & SQL Reporter: Timo Walther The Table API documentation page is quite large at the moment. We should split it up into multiple pages: Here is my suggestion: - Overview (Datatypes, Config, Registering Tables, Examples) - TableSources and Sinks - Table API - SQL - Functions -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5333) Keep Windows scripts in sync with shell scripts
Timo Walther created FLINK-5333: --- Summary: Keep Windows scripts in sync with shell scripts Key: FLINK-5333 URL: https://issues.apache.org/jira/browse/FLINK-5333 Project: Flink Issue Type: Improvement Components: Startup Shell Scripts Reporter: Timo Walther A lot of scripts are missing in Windows. E.g. there is a {{start-local.bat}} but no {{stop-local.bat}}. It is also not possible to start additional task managers. The script should be updated to be in sync with the shell scripts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5289) NPE when using value state on non-keyed stream
Timo Walther created FLINK-5289: --- Summary: NPE when using value state on non-keyed stream Key: FLINK-5289 URL: https://issues.apache.org/jira/browse/FLINK-5289 Project: Flink Issue Type: Bug Components: Streaming Reporter: Timo Walther Using a {{ValueStateDescriptor}} and {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to {{NullPointerException}} which is not very helpful for users: {code} java.lang.NullPointerException at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5177) Improve nullability handling
Timo Walther created FLINK-5177: --- Summary: Improve nullability handling Key: FLINK-5177 URL: https://issues.apache.org/jira/browse/FLINK-5177 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Currently, all fields of the Table API are marked as nullable by default. A lot of null checking could be avoided if we would properly handle nullability. Fields of tuples and POJOs with primitive fields can not be null. Elements of primitive arrays too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5144) Error while applying rule AggregateJoinTransposeRule
Timo Walther created FLINK-5144: --- Summary: Error while applying rule AggregateJoinTransposeRule Key: FLINK-5144 URL: https://issues.apache.org/jira/browse/FLINK-5144 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther AggregateJoinTransposeRule seems to cause errors. We have to investigate if this is a Flink or Calcite error. Here a simplified example: {code} select sum(l_extendedprice) from lineitem, part where p_partkey = l_partkey and l_quantity < ( select avg(l_quantity) from lineitem where l_partkey = p_partkey ) {code} Exception: {code} Exception in thread "main" java.lang.AssertionError: Internal error: Error occurred while applying rule AggregateJoinTransposeRule at org.apache.calcite.util.Util.newInternal(Util.java:792) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) at org.apache.calcite.rel.rules.AggregateJoinTransposeRule.onMatch(AggregateJoinTransposeRule.java:342) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) at org.apache.flink.api.table.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:251) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:286) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at org.apache.flink.api.scala.table.package$.table2RowDataSet(package.scala:77) at org.apache.flink.api.scala.sql.tpch.TPCHQueries$.runQ17(TPCHQueries.scala:826) at org.apache.flink.api.scala.sql.tpch.TPCHQueries$.main(TPCHQueries.scala:57) at org.apache.flink.api.scala.sql.tpch.TPCHQueries.main(TPCHQueries.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: java.lang.AssertionError: Type mismatch: rowtype of new rel: RecordType(BIGINT l_partkey, BIGINT p_partkey) NOT NULL rowtype of set: RecordType(BIGINT p_partkey) NOT NULL at org.apache.calcite.util.Litmus$1.fail(Litmus.java:31) at org.apache.calcite.plan.RelOptUtil.equal(RelOptUtil.java:1838) at org.apache.calcite.plan.volcano.RelSubset.add(RelSubset.java:273) at org.apache.calcite.plan.volcano.RelSet.add(RelSet.java:148) at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1820) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1766) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1032) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1052) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1942) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:136) ... 17 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5124) Support more temporal arithmetic
Timo Walther created FLINK-5124: --- Summary: Support more temporal arithmetic Key: FLINK-5124 URL: https://issues.apache.org/jira/browse/FLINK-5124 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Multiple TPC-H queries fail because of missing temporal arithmetic support. Since CALCITE-308 has been fixed we can add additional operations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5078) Introduce annotations for classes copied from Calcite
Timo Walther created FLINK-5078: --- Summary: Introduce annotations for classes copied from Calcite Key: FLINK-5078 URL: https://issues.apache.org/jira/browse/FLINK-5078 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther We have already copied several classes from Calcite because of missing features or bugs. In order to track those classes, update them when bumping up the version, or check if they became obsolete it might be useful to introduce a special annotation to mark those classes. Maybe with an additional standardized comment format which lines have been modified. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4832) Count/Sum 0 elements
Timo Walther created FLINK-4832: --- Summary: Count/Sum 0 elements Key: FLINK-4832 URL: https://issues.apache.org/jira/browse/FLINK-4832 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Currently, the Table API is unable to count or sum up 0 elements. We should improve DataSet aggregations for this. Maybe by union the original DataSet with a dummy record or by using a MapPartition function. Coming up with a good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4801) Input type inference is faulty with custom Tuples and RichFunctions
Timo Walther created FLINK-4801: --- Summary: Input type inference is faulty with custom Tuples and RichFunctions Key: FLINK-4801 URL: https://issues.apache.org/jira/browse/FLINK-4801 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther This issue has been discussed on the ML: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Type-problem-in-RichFlatMapFunction-when-using-GenericArray-type-td13929.html This returns the wrong type: {code} public static class Foo extends Tuple2{ public Foo() { } public Foo(K[] value0, K value1) { super(value0, value1); } } DataSource fooDataSource = env.fromElements(foo); DataSet ds = fooDataSource.join(fooDataSource) .where(field).equalTo(field) .with(new RichFlatJoinFunction () { @Override public void join(Foo first, Foo second, Collector out) throws Exception { out.collect(first); } }); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4791) Fix issues caused by expression reduction
Timo Walther created FLINK-4791: --- Summary: Fix issues caused by expression reduction Key: FLINK-4791 URL: https://issues.apache.org/jira/browse/FLINK-4791 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther While updating {{ExpressionTestBase}} for FLINK-4294, I noticed that many expressions fail when applying the {{ReduceExpressionRule}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4762) Use plural in time interval units
Timo Walther created FLINK-4762: --- Summary: Use plural in time interval units Key: FLINK-4762 URL: https://issues.apache.org/jira/browse/FLINK-4762 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther During the creation of FLIP-11 we decided to rename the time interval units. From {{minute}} to {{minutes}} and so on in Java and Scala Table API. {{12.minutes + 2.hours}} reads better. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4745) Convert KafkaTableSource test to unit tests
Timo Walther created FLINK-4745: --- Summary: Convert KafkaTableSource test to unit tests Key: FLINK-4745 URL: https://issues.apache.org/jira/browse/FLINK-4745 Project: Flink Issue Type: Improvement Components: Tests Reporter: Timo Walther Assignee: Timo Walther Kafka tests are extremely heavy and Table Sources and Sinks are only thin wrappers on top of the Kafka Sources / Sinks. That should not need to bring up Kafka clusters. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4704) Move Table API to org.apache.flink.table
Timo Walther created FLINK-4704: --- Summary: Move Table API to org.apache.flink.table Key: FLINK-4704 URL: https://issues.apache.org/jira/browse/FLINK-4704 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther This would be a large change. But maybe now is still a good time to do it. Otherwise we will never fix this. Actually, the Table API is in the wrong package. At the moment it is in {{org.apache.flink.api.table}} and the actual Scala/Java APIs are in {{org.apache.flink.api.java/scala.table}}. All other APIs such as Python, Gelly, Flink ML do not use the {{org.apache.flink.api}} namespace. I suggest the following packages: {code} org.apache.flink.table org.apache.flink.table.api.java org.apache.flink.table.api.scala {code} What do you think? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4699) Convert Kafka TableSource/TableSink tests to unit tests
Timo Walther created FLINK-4699: --- Summary: Convert Kafka TableSource/TableSink tests to unit tests Key: FLINK-4699 URL: https://issues.apache.org/jira/browse/FLINK-4699 Project: Flink Issue Type: Test Components: Table API & SQL Reporter: Timo Walther The Kafka tests are extremely heavy and that the Table Sources and Sinks are only thin wrappers on top of the Kafka Sources / Sinks. That should not need to bring up Kafka clusters. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4693) Add session group-windows for batch tables
Timo Walther created FLINK-4693: --- Summary: Add session group-windows for batch tables Key: FLINK-4693 URL: https://issues.apache.org/jira/browse/FLINK-4693 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther Add Session group-windows for batch tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4692) Add tumbling and sliding group-windows for batch tables
Timo Walther created FLINK-4692: --- Summary: Add tumbling and sliding group-windows for batch tables Key: FLINK-4692 URL: https://issues.apache.org/jira/browse/FLINK-4692 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther Add Tumble and Slide group-windows for batch tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4691) Add group-windows for streaming tables
Timo Walther created FLINK-4691: --- Summary: Add group-windows for streaming tables Key: FLINK-4691 URL: https://issues.apache.org/jira/browse/FLINK-4691 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Add Tumble, Slide, Session group-windows for streaming tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. Implementation of group-windows on streaming tables. This includes implementing the API of group-windows, the logical validation for group-windows, and the definition of the “rowtime” and “systemtime” keywords. Group-windows on batch tables won’t be initially supported and will throw an exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4686) Add possibility to get column names
Timo Walther created FLINK-4686: --- Summary: Add possibility to get column names Key: FLINK-4686 URL: https://issues.apache.org/jira/browse/FLINK-4686 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther For debugging and maybe for visualization in future (e.g. in a shell) it would be good to have the possibilty to get the names of {{Table}} columns. At the moment the user has no idea how the table columns are named; if they need to be matched with POJO fields for example. My suggestion: {code} Schema s = table.schema(); TypeInformation type = s.getType(1); TypeInformation type = s.getType("col"); String s = s.getColumnName(1); String[] s = s.getColumnNames(); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4671) Table API can not be built
Timo Walther created FLINK-4671: --- Summary: Table API can not be built Key: FLINK-4671 URL: https://issues.apache.org/jira/browse/FLINK-4671 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Running {{mvn clean verify}} in {{flink-table}} results in a build failure. {code} [ERROR] Failed to execute goal on project flink-table_2.10: Could not resolve dependencies for project org.apache.flink:flink-table_2.10:jar:1.2-SNAPSHOT: Failure to find org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in https://repo.maven.apache.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced -> [Help 1] {code} However, the master can be built successfully. -- This message was sent by Atlassian JIRA (v6.3.4#6332)