[jira] [Assigned] (FLINK-10929) Add support for Apache Arrow
[ https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10929: Assignee: (was: vinoyang) > Add support for Apache Arrow > > > Key: FLINK-10929 > URL: https://issues.apache.org/jira/browse/FLINK-10929 > Project: Flink > Issue Type: Wish >Reporter: Pedro Cardoso Silva >Priority: Minor > > Investigate the possibility of adding support for Apache Arrow as a > standardized columnar, memory format for data. > Given the activity that [https://github.com/apache/arrow] is currently > getting and its claims objective of providing a zero-copy, standardized data > format across platforms, I think it makes sense for Flink to look into > supporting it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json
[ https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711081#comment-16711081 ] vinoyang commented on FLINK-9461: - [~twalthr] Some interfaces, such as StreamTableSinkFactory, are used by multiple connectors. So while migrating, I probably won't delete them, but instead create a copy of the Java implementation in table-common. I have done [most of the work|https://github.com/yanghua/flink/commit/03e69684d7f584618fb5db04be13c63a403d6d12] right now, but the transitive dependencies of some interfaces make it impossible to compile successfully and need to rethink. In addition, I suggest that we create an umbrella issue for migrating all connectors and then use the current issue as a sub task. The migration of connectors and the transformation of interfaces should be centralized. What do you think? > Disentangle flink-connector-kafka from flink-table and flink-json > - > > Key: FLINK-9461 > URL: https://issues.apache.org/jira/browse/FLINK-9461 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.8.0 > > > Currently, the {{flink-connector-kafka}} module has a dependency on > {{flink-table}} and {{flink-json}}. The reason seems to be that the module > contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though > the {{flink-table}} and {{flink-json}} dependency are marked as optional, the > {{flink-connector-kafka}} will still contain the table sources and sinks. I > think this is not a clean design. > I would propose to move the table sources and sinks into a dedicated module > which depends on {{flink-connector-kafka}}. That way we would better separate > dependencies and could remove {{flink-table}} and {{flink-json}} from > {{flink-connector-kafka}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711073#comment-16711073 ] ASF GitHub Bot commented on FLINK-11010: lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute ` if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be written to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxm
[GitHub] lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute ` if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be written to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSchem
[GitHub] lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute ` if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be written to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSch
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711071#comment-16711071 ] ASF GitHub Bot commented on FLINK-11010: lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute ` if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be written to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.faster
[jira] [Commented] (FLINK-8033) Build Flink with JDK 9
[ https://issues.apache.org/jira/browse/FLINK-8033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711064#comment-16711064 ] Takanobu Asanuma commented on FLINK-8033: - FYI, Hadoop stopped fixing the incompatibilities of JDK 9/10 as they are EOL, and focuses on supporting JDK 8/11. Please see HADOOP-11123. > Build Flink with JDK 9 > -- > > Key: FLINK-8033 > URL: https://issues.apache.org/jira/browse/FLINK-8033 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > This is a JIRA to track all issues that found to make Flink compatible with > Java 9. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711063#comment-16711063 ] ASF GitHub Bot commented on FLINK-11010: lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444773602 @lzqdename, thanks for your detail description. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.2 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444773602 @lzqdename, thanks for your detail description. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11084) Incorrect ouput after two consecutive split and select
[ https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-11084: Summary: Incorrect ouput after two consecutive split and select (was: Incorrect ouput after two successive split and select) > Incorrect ouput after two consecutive split and select > -- > > Key: FLINK-11084 > URL: https://issues.apache.org/jira/browse/FLINK-11084 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The second OutputSelector of two successive split and select are actually not > rely on the first one. They are in the same array of OutputSelector in > DirectedOutput. > For example. > outputSelector1 => \{“name1” or ”name2“} > outputSelector2 => \{”name3“ or “name4”} > resultStream = > dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3") > expectedResult for input \{StreamRecord1}: > outputSelector1 => \{”name1“} > outputSelector2 => \{”name3“} > resultStream => {} > actualResult: > resultStream => \{StreamRecord1} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11084) Incorrect ouput after two successive split and select
[ https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711041#comment-16711041 ] Shimin Yang commented on FLINK-11084: - Currently I am creating a OutputSelectorWrapper to hold the current OutputSelector and all successive parent OutputSelectors and selectNames during the StreamGraph generation. And in the runtime every record will go through all the OutputSelectors in the right order. > Incorrect ouput after two successive split and select > - > > Key: FLINK-11084 > URL: https://issues.apache.org/jira/browse/FLINK-11084 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The second OutputSelector of two successive split and select are actually not > rely on the first one. They are in the same array of OutputSelector in > DirectedOutput. > For example. > outputSelector1 => \{“name1” or ”name2“} > outputSelector2 => \{”name3“ or “name4”} > resultStream = > dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3") > expectedResult for input \{StreamRecord1}: > outputSelector1 => \{”name1“} > outputSelector2 => \{”name3“} > resultStream => {} > actualResult: > resultStream => \{StreamRecord1} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11084) Incorrect ouput after two successive split and select
Shimin Yang created FLINK-11084: --- Summary: Incorrect ouput after two successive split and select Key: FLINK-11084 URL: https://issues.apache.org/jira/browse/FLINK-11084 Project: Flink Issue Type: Bug Components: Streaming Reporter: Shimin Yang Assignee: Shimin Yang Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 The second OutputSelector of two successive split and select are actually not rely on the first one. They are in the same array of OutputSelector in DirectedOutput. For example. outputSelector1 => \{“name1” or ”name2“} outputSelector2 => \{”name3“ or “name4”} resultStream = dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3") expectedResult for input \{StreamRecord1}: outputSelector1 => \{”name1“} outputSelector2 => \{”name3“} resultStream => {} actualResult: resultStream => \{StreamRecord1} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711033#comment-16711033 ] ASF GitHub Bot commented on FLINK-11010: lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute `if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be write to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackso
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711035#comment-16711035 ] ASF GitHub Bot commented on FLINK-11010: lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute ` if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be write to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxm
[GitHub] lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute ` if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be write to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSchem
[GitHub] lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute `if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be write to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSchema.conver
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711023#comment-16711023 ] ASF GitHub Bot commented on FLINK-11010: lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute `if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be write to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:
[GitHub] lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 154407483 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute `if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 154407483 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be write to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert (JsonRowSerializationSchema.java:189) [8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow (JsonRowSerializationSchema.java:128) [9] org.apache.flin
[jira] [Commented] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable
[ https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711016#comment-16711016 ] boshu Zheng commented on FLINK-11083: - cc [~pnowojski] Could you please have a look? > CRowSerializerConfigSnapshot is not instantiable > > > Key: FLINK-11083 > URL: https://issues.apache.org/jira/browse/FLINK-11083 > Project: Flink > Issue Type: Bug > Components: Table API & SQL, Type Serialization System >Reporter: boshu Zheng >Assignee: boshu Zheng >Priority: Major > > An exception was encountered when restarting a job with savepoint in our > production env, > {code:java} > 2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task > :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> > Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING > to FAILED. > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) > ... 5 more > Caused by: java.lang.RuntimeException: The class > 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' > is not instantiable: The class has no (implicit) public nullary constructor, > i.e. a constructor without arguments. > at > org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) > at > org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > {code} > I add tests to CRowSerializerTest to make sure this is definitely a bug, > {code:java} > @Test > def testDefaultConstructo
[jira] [Created] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable
boshu Zheng created FLINK-11083: --- Summary: CRowSerializerConfigSnapshot is not instantiable Key: FLINK-11083 URL: https://issues.apache.org/jira/browse/FLINK-11083 Project: Flink Issue Type: Bug Components: Table API & SQL, Type Serialization System Reporter: boshu Zheng Assignee: boshu Zheng An exception was encountered when restarting a job with savepoint in our production env, {code:java} 2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) ... 5 more Caused by: java.lang.RuntimeException: The class 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' is not instantiable: The class has no (implicit) public nullary constructor, i.e. a constructor without arguments. at org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more {code} I add tests to CRowSerializerTest to make sure this is definitely a bug, {code:java} @Test def testDefaultConstructor(): Unit = { new CRowSerializer.CRowSerializerConfigSnapshot() /// This would fail the test val serializerConfigSnapshotClass = Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot") InstantiationUtil.instantiate(serializerConfigSnapshotClass) } @Test def testStateRestore(): Unit = { class IKeyedPr
[jira] [Updated] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11048: --- Labels: pull-request-available (was: ) > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711005#comment-16711005 ] ASF GitHub Bot commented on FLINK-11048: tweise opened a new pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249 ## What is the purpose of the change Adds the ability to programmatically execute a job with savepoint restore option. https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E The number of parameters to *executeRemotely* is a bit higher than what we might have expected. The change to ScalaShellRemoteStreamEnvironment needs a closer look. It needs to override the jar file list, but do we really need the *ProgramInvocationException* there? ## Brief change log - executeRemotely changed to static method that can be used with any StreamExecutionEnvironment ## Verifying this change No added test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise opened a new pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
tweise opened a new pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249 ## What is the purpose of the change Adds the ability to programmatically execute a job with savepoint restore option. https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E The number of parameters to *executeRemotely* is a bit higher than what we might have expected. The change to ScalaShellRemoteStreamEnvironment needs a closer look. It needs to override the jar file list, but do we really need the *ProgramInvocationException* there? ## Brief change log - executeRemotely changed to static method that can be used with any StreamExecutionEnvironment ## Verifying this change No added test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710996#comment-16710996 ] ASF GitHub Bot commented on FLINK-11010: lzqdename removed a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444723623 In flink, I think it is better to avoid timezone in calculation ,because the input is always UTC This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.2 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710998#comment-16710998 ] ASF GitHub Bot commented on FLINK-10543: sunjincheng121 commented on issue #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#issuecomment-444757050 Thanks for the quickly update! @hequn8128 +1 to merged. Thanks, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on issue #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators
sunjincheng121 commented on issue #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#issuecomment-444757050 Thanks for the quickly update! @hequn8128 +1 to merged. Thanks, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lzqdename removed a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lzqdename removed a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444723623 In flink, I think it is better to avoid timezone in calculation ,because the input is always UTC This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-9555) Support table api in scala shell
[ https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-9555. -- Resolution: Fixed Fix Version/s: 1.7.1 1.8.0 Fixed in master: b3a378ad96088442825f1ad9f167d9571b5f4f78 Fixed in release-1.7: efc73a872ac52e314bb1a05b9c5ed045cde6df1f > Support table api in scala shell > > > Key: FLINK-9555 > URL: https://issues.apache.org/jira/browse/FLINK-9555 > Project: Flink > Issue Type: New Feature > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Assignee: shuiqiangchen >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > It would be nice to have table api available in scala shell so that user can > experience table api in interactive way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9555) Support table api in scala shell
[ https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710993#comment-16710993 ] ASF GitHub Bot commented on FLINK-9555: --- asfgit closed pull request #7121: [FLINK-9555]Support table api in scala shell URL: https://github.com/apache/flink/pull/7121 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml index 3c9a563b30f..e7d31355b06 100644 --- a/flink-scala-shell/pom.xml +++ b/flink-scala-shell/pom.xml @@ -78,6 +78,13 @@ under the License. ${scala.version} + + org.apache.flink + flink-table_${scala.binary.version} + ${project.version} + provided + + diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala index 4b6e886994a..c124d8ea8a3 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala @@ -23,6 +23,8 @@ import java.io.{BufferedReader, File, FileOutputStream} import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment, ScalaShellRemoteStreamEnvironment} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} import org.apache.flink.util.AbstractID import scala.tools.nsc.interpreter._ @@ -90,10 +92,17 @@ class FlinkILoop( } // local environment - val (scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment) = { + val ( +scalaBenv: ExecutionEnvironment, +scalaSenv: StreamExecutionEnvironment, +scalaBTEnv: BatchTableEnvironment, +scalaSTEnv: StreamTableEnvironment +) = { val scalaBenv = new ExecutionEnvironment(remoteBenv) val scalaSenv = new StreamExecutionEnvironment(remoteSenv) -(scalaBenv,scalaSenv) +val scalaBTEnv = TableEnvironment.getTableEnvironment(scalaBenv) +val scalaSTEnv = TableEnvironment.getTableEnvironment(scalaSenv) +(scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv) } /** @@ -139,7 +148,10 @@ class FlinkILoop( "org.apache.flink.api.scala._", "org.apache.flink.api.scala.utils._", "org.apache.flink.streaming.api.scala._", -"org.apache.flink.streaming.api.windowing.time._" +"org.apache.flink.streaming.api.windowing.time._", +"org.apache.flink.table.api._", +"org.apache.flink.table.api.scala._", +"org.apache.flink.types.Row" ) override def createInterpreter(): Unit = { @@ -152,6 +164,8 @@ class FlinkILoop( // set execution environment intp.bind("benv", this.scalaBenv) intp.bind("senv", this.scalaSenv) + intp.bind("btenv", this.scalaBTEnv) + intp.bind("stenv", this.scalaSTEnv) } } @@ -243,22 +257,29 @@ class FlinkILoop( F L I N K - S C A L A - S H E L L -NOTE: Use the prebound Execution Environments to implement batch or streaming programs. +NOTE: Use the prebound Execution Environments and Table Environment to implement batch or streaming programs. - Batch - Use the 'benv' variable + Batch - Use the 'benv' and 'btenv' variable * val dataSet = benv.readTextFile("/path/to/data") * dataSet.writeAsText("/path/to/output") * benv.execute("My batch program") +* +* val batchTable = btenv.fromDataSet(dataSet) +* btenv.registerTable("tableName", batchTable) +* val result = btenv.sqlQuery("SELECT * FROM tableName").collect +HINT: You can use print() on a DataSet to print the contents or collect() +a sql query result back to the shell. -HINT: You can use print() on a DataSet to print the contents to the shell. - - Streaming - Use the 'senv' variable + Streaming - Use the 'senv' and 'stenv' variable * val dataStream = senv.fromElements(1, 2, 3, 4) * dataStream.countWindowAll(2).sum(0).print() +* +* val streamTable = stenv.fromDataStream(dataStream, 'num) +* val resultTable = streamTable.select('num).where('num % 2 === 1 ) +* resultTable.toAppendStream[Row].print() * senv.execute("My streaming program") - HINT: You can only print a DataStream to the shell in local mode. """ // scalastyle:on diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/Sca
[GitHub] asfgit closed pull request #7121: [FLINK-9555]Support table api in scala shell
asfgit closed pull request #7121: [FLINK-9555]Support table api in scala shell URL: https://github.com/apache/flink/pull/7121 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml index 3c9a563b30f..e7d31355b06 100644 --- a/flink-scala-shell/pom.xml +++ b/flink-scala-shell/pom.xml @@ -78,6 +78,13 @@ under the License. ${scala.version} + + org.apache.flink + flink-table_${scala.binary.version} + ${project.version} + provided + + diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala index 4b6e886994a..c124d8ea8a3 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala @@ -23,6 +23,8 @@ import java.io.{BufferedReader, File, FileOutputStream} import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment, ScalaShellRemoteStreamEnvironment} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} import org.apache.flink.util.AbstractID import scala.tools.nsc.interpreter._ @@ -90,10 +92,17 @@ class FlinkILoop( } // local environment - val (scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment) = { + val ( +scalaBenv: ExecutionEnvironment, +scalaSenv: StreamExecutionEnvironment, +scalaBTEnv: BatchTableEnvironment, +scalaSTEnv: StreamTableEnvironment +) = { val scalaBenv = new ExecutionEnvironment(remoteBenv) val scalaSenv = new StreamExecutionEnvironment(remoteSenv) -(scalaBenv,scalaSenv) +val scalaBTEnv = TableEnvironment.getTableEnvironment(scalaBenv) +val scalaSTEnv = TableEnvironment.getTableEnvironment(scalaSenv) +(scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv) } /** @@ -139,7 +148,10 @@ class FlinkILoop( "org.apache.flink.api.scala._", "org.apache.flink.api.scala.utils._", "org.apache.flink.streaming.api.scala._", -"org.apache.flink.streaming.api.windowing.time._" +"org.apache.flink.streaming.api.windowing.time._", +"org.apache.flink.table.api._", +"org.apache.flink.table.api.scala._", +"org.apache.flink.types.Row" ) override def createInterpreter(): Unit = { @@ -152,6 +164,8 @@ class FlinkILoop( // set execution environment intp.bind("benv", this.scalaBenv) intp.bind("senv", this.scalaSenv) + intp.bind("btenv", this.scalaBTEnv) + intp.bind("stenv", this.scalaSTEnv) } } @@ -243,22 +257,29 @@ class FlinkILoop( F L I N K - S C A L A - S H E L L -NOTE: Use the prebound Execution Environments to implement batch or streaming programs. +NOTE: Use the prebound Execution Environments and Table Environment to implement batch or streaming programs. - Batch - Use the 'benv' variable + Batch - Use the 'benv' and 'btenv' variable * val dataSet = benv.readTextFile("/path/to/data") * dataSet.writeAsText("/path/to/output") * benv.execute("My batch program") +* +* val batchTable = btenv.fromDataSet(dataSet) +* btenv.registerTable("tableName", batchTable) +* val result = btenv.sqlQuery("SELECT * FROM tableName").collect +HINT: You can use print() on a DataSet to print the contents or collect() +a sql query result back to the shell. -HINT: You can use print() on a DataSet to print the contents to the shell. - - Streaming - Use the 'senv' variable + Streaming - Use the 'senv' and 'stenv' variable * val dataStream = senv.fromElements(1, 2, 3, 4) * dataStream.countWindowAll(2).sum(0).print() +* +* val streamTable = stenv.fromDataStream(dataStream, 'num) +* val resultTable = streamTable.select('num).where('num % 2 === 1 ) +* resultTable.toAppendStream[Row].print() * senv.execute("My streaming program") - HINT: You can only print a DataStream to the shell in local mode. """ // scalastyle:on diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index 337e4fb9be9..fc90d8d143c 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-she
[GitHub] kisimple commented on issue #7192: [hotfix][test][streaming] Fix a test failure in Windows 7 environment.
kisimple commented on issue #7192: [hotfix][test][streaming] Fix a test failure in Windows 7 environment. URL: https://github.com/apache/flink/pull/7192#issuecomment-444750934 cc @kl0u This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kisimple commented on issue #7215: [hotfix][test][streaming] Fix invalid testNotSideOutputXXX in WindowOperatorTest
kisimple commented on issue #7215: [hotfix][test][streaming] Fix invalid testNotSideOutputXXX in WindowOperatorTest URL: https://github.com/apache/flink/pull/7215#issuecomment-444750664 cc @kl0u This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710971#comment-16710971 ] ASF GitHub Bot commented on FLINK-10543: hequn8128 commented on issue #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#issuecomment-444745171 @sunjincheng121 Thanks for your view. I have updated the pr according to your suggestions. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on issue #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators
hequn8128 commented on issue #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#issuecomment-444745171 @sunjincheng121 Thanks for your view. I have updated the pr according to your suggestions. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710966#comment-16710966 ] zhijiang commented on FLINK-11082: -- [~pnowojski], what do you think of issue? > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] maqingxiang opened a new pull request #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs
maqingxiang opened a new pull request #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs URL: https://github.com/apache/flink/pull/7248 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710963#comment-16710963 ] TisonKun commented on FLINK-10333: -- Hi [~StephanEwen] and [~till.rohrmann] With an offline discuss with [~xiaogang.shi] we see ZK has a transactional mechanism so that we can ensure only the leader writes ZK. Given this knowledge and the inconsistencies [~till.rohrmann] noticed, before go into reimplementation, I did a survey of the usage of ZK based stores in flink. Ideally there is exact one role who writes a specific znode. There are four types of znodes that flink writes. Besides {{SubmittedJobGraphStore}} written by {{Dispatcher}}, {{CompletedCheckpointStore}} written by {{JobMaster}} and {{MesosWorkerStore}} written by {{MesosResourceManager}}, there is the {{RunningJobsRegistry}} that also has a ZK based implementation. All of the first three write ZK with a heavy “store lock”, but as [~xiaogang.shi] pointing out, it still lacks of atomicity. And with the solution based on ZK transaction — for example, a current {{Dispatcher}} leader {{setData}} with {{check}} for {{election-node-path}} — we can ensure the atomicity while getting rid of the lock. For the last one, {{RunningJobsRegistry}}, situation becomes a bit more complex. {{JobManagerRunner}} is responsible for {{setJobRunning}} and {{setJobFinished}} and {{Dispatcher}} is responsible for {{clearJob}}. This is against the ideal that one role for one znode. Also I notice that the gap between the semantic of {{DONE}} and that of clear is ambiguous. {{JobSchedulingStatus}} becomes {{DONE}} only if an {{ArchivedExecutionGraph}} generated so that we can prevent the same job be processed by an approach other than check an ephemeral {{DONE}} status. What if we replace {{setJobFinished}} with clearing {{RunningJobsRegistry}}? > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.8.0 > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} > which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} > * Getting rid of the {{SubmittedJobGraphListener}} would be helpful > These problems made me think how reliable these components actually work. > Since these components are very important, I propose to refactor them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7208) Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView
[ https://issues.apache.org/jira/browse/FLINK-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710960#comment-16710960 ] ASF GitHub Bot commented on FLINK-7208: --- walterddr commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView URL: https://github.com/apache/flink/pull/7201#discussion_r239320583 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1437,36 +1437,30 @@ object AggregateUtil { case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT => aggregates(index) = new CollectAggFunction(FlinkTypeFactory.toTypeInfo(relDataType)) -accTypes(index) = aggregates(index).getAccumulatorType case udagg: AggSqlFunction => aggregates(index) = udagg.getFunction -accTypes(index) = udagg.accType case unSupported: SqlAggFunction => throw new TableException(s"Unsupported Function: '${unSupported.getName}'") } } + accTypes(index) = getAccumulatorTypeOfAggregateFunction(aggregates(index)) } val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) // create accumulator type information for every aggregate function aggregates.zipWithIndex.foreach { case (agg, index) => Review comment: +1 you are right. I missed the 4 lines above. 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor build-in agg(MaxWithRetractAccumulator and > MinWithRetractAccumulator) using the DataView > - > > Key: FLINK-7208 > URL: https://issues.apache.org/jira/browse/FLINK-7208 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: kaibo.zhou >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Refactor build-in agg(MaxWithRetractAccumulator and > MinWithRetractAccumulator) using the DataView. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7208) Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView
[ https://issues.apache.org/jira/browse/FLINK-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710961#comment-16710961 ] ASF GitHub Bot commented on FLINK-7208: --- walterddr commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView URL: https://github.com/apache/flink/pull/7201#discussion_r239320467 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase { .groupBy('b) .select('a.count as 'cnt, 'b) .groupBy('cnt) - .select('cnt, 'b.count as 'freq) + .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max) val results = t.toRetractStream[Row](queryConfig) results.addSink(new RetractingSink) env.execute() -val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1") +val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", "6,1,6,6") Review comment: Yeah. I am incline to suggest we fix FLINK-11074 first before we check this in. What do you think @dianfu @twalthr . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor build-in agg(MaxWithRetractAccumulator and > MinWithRetractAccumulator) using the DataView > - > > Key: FLINK-7208 > URL: https://issues.apache.org/jira/browse/FLINK-7208 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: kaibo.zhou >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Refactor build-in agg(MaxWithRetractAccumulator and > MinWithRetractAccumulator) using the DataView. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView
walterddr commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView URL: https://github.com/apache/flink/pull/7201#discussion_r239320583 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1437,36 +1437,30 @@ object AggregateUtil { case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT => aggregates(index) = new CollectAggFunction(FlinkTypeFactory.toTypeInfo(relDataType)) -accTypes(index) = aggregates(index).getAccumulatorType case udagg: AggSqlFunction => aggregates(index) = udagg.getFunction -accTypes(index) = udagg.accType case unSupported: SqlAggFunction => throw new TableException(s"Unsupported Function: '${unSupported.getName}'") } } + accTypes(index) = getAccumulatorTypeOfAggregateFunction(aggregates(index)) } val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) // create accumulator type information for every aggregate function aggregates.zipWithIndex.foreach { case (agg, index) => Review comment: +1 you are right. I missed the 4 lines above. 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView
walterddr commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView URL: https://github.com/apache/flink/pull/7201#discussion_r239320467 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase { .groupBy('b) .select('a.count as 'cnt, 'b) .groupBy('cnt) - .select('cnt, 'b.count as 'freq) + .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max) val results = t.toRetractStream[Row](queryConfig) results.addSink(new RetractingSink) env.execute() -val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1") +val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", "6,1,6,6") Review comment: Yeah. I am incline to suggest we fix FLINK-11074 first before we check this in. What do you think @dianfu @twalthr . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710946#comment-16710946 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239307495 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() +* table.flatMap(func('c)).as('a, 'b) +* }}} +* +* Java Example: +* {{{ +* class MyFlatMapFunction extends TableFunction { +* public void eval(String str) { +* if (str.contains("#")) { +* String[] splits = str.split("#"); +* collect(Row.of(splits[0], splits[1])); +* } +* } +* +* public TypeInformation getResultType(Class[] signature) { +* return Types.ROW(Types.STRING, Types.STRING); +* } +* } +* +* TableFunction func = new MyFlatMapFunction(); +* tableEnv.registerFunction("func", func); +* table.flatMap("func(c)").as("a, b"); +* }}} +*/ + def flatMap(tableFunction: Expression): Table = { +unwrap(tableFunction, tableEnv) match { + case _: TableFunctionCall => + case _ => throw new ValidationException("Only TableFunction can be used in flatMap.") +} + +// rename output fields names to avoid ambiguous name +val originalCall = UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction) +val originalOutputFieldNames = originalCall.output.map(_.name) +val usedFieldNames: mutable.HashSet[String] = mutable.HashSet() +logicalPlan.output.map(_.name).foreach(usedFieldNames.add) + +var i: Int = 0 +def findNewName(n: String): String = { + val newName = n + "_" + i + i += 1 + if (usedFieldNames.contains(newName)) { +findNewName(n) + } else { +usedFieldNames.add(newName) +newName + } +} + +val newOutputFieldNames = originalOutputFieldNames.map(n => + if (usedFieldNames.contains(n)) { +findNewName(n) + } else { +n + } +) Review comment: Good suggestion. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710941#comment-16710941 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239308968 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/FlatMapValidationTest.scala ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.Func15 +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg +import org.apache.flink.table.utils.{TableFunc0, TableTestBase} +import org.junit.Test + +class FlatMapValidationTest extends TableTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidMapFunctionTypeAggregation(): Unit = { +val util = streamTestUtil() +util.addTable[(Int)]( + "MyTable", 'int) + .flatMap('int.sum) // do not support AggregateFunction as input Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10973) Add Map operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710955#comment-16710955 ] ASF GitHub Bot commented on FLINK-10973: dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#issuecomment-444737589 Hi @twalthr, any comments? That would be great if you can give some feedback on this ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Map operator to Table API > - > > Key: FLINK-10973 > URL: https://issues.apache.org/jira/browse/FLINK-10973 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Map operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] > The usage: > {code:java} > val res = tab >.map(fun: ScalarFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to table API
dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#issuecomment-444737589 Hi @twalthr, any comments? That would be great if you can give some feedback on this ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710947#comment-16710947 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239302052 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -79,7 +80,8 @@ class Table( * @param udtfCall A String expression of the TableFunction call. */ def this(tableEnv: TableEnvironment, udtfCall: String) { -this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, udtfCall)) +this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall( + tableEnv, ExpressionParser.parseExpression(udtfCall))) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710950#comment-16710950 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239301814 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, UnresolvedFieldReference, WindowProperty} +import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, TableFunctionCall, UnresolvedAlias, UnresolvedFieldReference, WindowProperty} Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710940#comment-16710940 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239308474 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -320,6 +320,18 @@ case class TableFunctionCall( override private[flink] def children: Seq[Expression] = parameters + def as(fields: Symbol*): TableFunctionCall = { +this.aliases = Some(fields.map(_.name)) +this + } + + def as(fields: String): TableFunctionCall = { +val fieldExprs = ExpressionParser Review comment: Make sense. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710943#comment-16710943 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239308521 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -795,7 +795,7 @@ object UserDefinedFunctionUtils { "define table function followed by some Alias.") } -val functionCall: LogicalTableFunctionCall = unwrap(ExpressionParser.parseExpression(udtf)) +val functionCall: LogicalTableFunctionCall = unwrap(udtfExpr) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710949#comment-16710949 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239308496 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -328,7 +340,9 @@ case class TableFunctionCall( * @return this table function call */ private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = { -this.aliases = aliasList +if (aliasList.isDefined) { Review comment: Agree. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710944#comment-16710944 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239302508 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() +* table.flatMap(func('c)).as('a, 'b) +* }}} +* +* Java Example: +* {{{ +* class MyFlatMapFunction extends TableFunction { +* public void eval(String str) { +* if (str.contains("#")) { +* String[] splits = str.split("#"); +* collect(Row.of(splits[0], splits[1])); +* } +* } +* +* public TypeInformation getResultType(Class[] signature) { +* return Types.ROW(Types.STRING, Types.STRING); +* } +* } +* +* TableFunction func = new MyFlatMapFunction(); +* tableEnv.registerFunction("func", func); +* table.flatMap("func(c)").as("a, b"); +* }}} +*/ + def flatMap(tableFunction: Expression): Table = { +unwrap(tableFunction, tableEnv) match { + case _: TableFunctionCall => + case _ => throw new ValidationException("Only TableFunction can be used in flatMap.") +} + +// rename output fields names to avoid ambiguous name +val originalCall = UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction) +val originalOutputFieldNames = originalCall.output.map(_.name) +val usedFieldNames: mutable.HashSet[String] = mutable.HashSet() +logicalPlan.output.map(_.name).foreach(usedFieldNames.add) Review comment: Good suggestion, done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710945#comment-16710945 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239307765 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -320,6 +320,18 @@ case class TableFunctionCall( override private[flink] def children: Seq[Expression] = parameters + def as(fields: Symbol*): TableFunctionCall = { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710948#comment-16710948 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239318443 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/FlatMapITCase.scala ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase +import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.utils.TableFunc2 +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf[Parameterized]) +class FlatMapITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { + + private def testData(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = { +val data = new mutable.MutableList[(Int, Long, String)] +data.+=((1, 1L, "Jack#22")) +data.+=((2, 2L, "John#333")) +data.+=((3, 2L, "Anna#")) +data.+=((4, 3L, "nosharp#5")) +env.fromCollection(data) + } + + @Test + def testFlatMap(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val func2 = new TableFunc2 +val results = testData(env).toTable(tEnv, 'a, 'b, 'c) + .flatMap(func2('c)) Review comment: Agree. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710942#comment-16710942 ] ASF GitHub Bot commented on FLINK-10974: dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239302128 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239308521 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -795,7 +795,7 @@ object UserDefinedFunctionUtils { "define table function followed by some Alias.") } -val functionCall: LogicalTableFunctionCall = unwrap(ExpressionParser.parseExpression(udtf)) +val functionCall: LogicalTableFunctionCall = unwrap(udtfExpr) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239308496 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -328,7 +340,9 @@ case class TableFunctionCall( * @return this table function call */ private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = { -this.aliases = aliasList +if (aliasList.isDefined) { Review comment: Agree. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239308474 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -320,6 +320,18 @@ case class TableFunctionCall( override private[flink] def children: Seq[Expression] = parameters + def as(fields: Symbol*): TableFunctionCall = { +this.aliases = Some(fields.map(_.name)) +this + } + + def as(fields: String): TableFunctionCall = { +val fieldExprs = ExpressionParser Review comment: Make sense. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239302128 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239301814 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, UnresolvedFieldReference, WindowProperty} +import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, TableFunctionCall, UnresolvedAlias, UnresolvedFieldReference, WindowProperty} Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239318443 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/FlatMapITCase.scala ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase +import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.utils.TableFunc2 +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf[Parameterized]) +class FlatMapITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { + + private def testData(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = { +val data = new mutable.MutableList[(Int, Long, String)] +data.+=((1, 1L, "Jack#22")) +data.+=((2, 2L, "John#333")) +data.+=((3, 2L, "Anna#")) +data.+=((4, 3L, "nosharp#5")) +env.fromCollection(data) + } + + @Test + def testFlatMap(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val func2 = new TableFunc2 +val results = testData(env).toTable(tEnv, 'a, 'b, 'c) + .flatMap(func2('c)) Review comment: Agree. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239308968 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/FlatMapValidationTest.scala ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.Func15 +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg +import org.apache.flink.table.utils.{TableFunc0, TableTestBase} +import org.junit.Test + +class FlatMapValidationTest extends TableTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidMapFunctionTypeAggregation(): Unit = { +val util = streamTestUtil() +util.addTable[(Int)]( + "MyTable", 'int) + .flatMap('int.sum) // do not support AggregateFunction as input Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239302052 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -79,7 +80,8 @@ class Table( * @param udtfCall A String expression of the TableFunction call. */ def this(tableEnv: TableEnvironment, udtfCall: String) { -this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, udtfCall)) +this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall( + tableEnv, ExpressionParser.parseExpression(udtfCall))) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239307495 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() +* table.flatMap(func('c)).as('a, 'b) +* }}} +* +* Java Example: +* {{{ +* class MyFlatMapFunction extends TableFunction { +* public void eval(String str) { +* if (str.contains("#")) { +* String[] splits = str.split("#"); +* collect(Row.of(splits[0], splits[1])); +* } +* } +* +* public TypeInformation getResultType(Class[] signature) { +* return Types.ROW(Types.STRING, Types.STRING); +* } +* } +* +* TableFunction func = new MyFlatMapFunction(); +* tableEnv.registerFunction("func", func); +* table.flatMap("func(c)").as("a, b"); +* }}} +*/ + def flatMap(tableFunction: Expression): Table = { +unwrap(tableFunction, tableEnv) match { + case _: TableFunctionCall => + case _ => throw new ValidationException("Only TableFunction can be used in flatMap.") +} + +// rename output fields names to avoid ambiguous name +val originalCall = UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction) +val originalOutputFieldNames = originalCall.output.map(_.name) +val usedFieldNames: mutable.HashSet[String] = mutable.HashSet() +logicalPlan.output.map(_.name).foreach(usedFieldNames.add) + +var i: Int = 0 +def findNewName(n: String): String = { + val newName = n + "_" + i + i += 1 + if (usedFieldNames.contains(newName)) { +findNewName(n) + } else { +usedFieldNames.add(newName) +newName + } +} + +val newOutputFieldNames = originalOutputFieldNames.map(n => + if (usedFieldNames.contains(n)) { +findNewName(n) + } else { +n + } +) Review comment: Good suggestion. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239307765 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -320,6 +320,18 @@ case class TableFunctionCall( override private[flink] def children: Seq[Expression] = parameters + def as(fields: Symbol*): TableFunctionCall = { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239302508 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() +* table.flatMap(func('c)).as('a, 'b) +* }}} +* +* Java Example: +* {{{ +* class MyFlatMapFunction extends TableFunction { +* public void eval(String str) { +* if (str.contains("#")) { +* String[] splits = str.split("#"); +* collect(Row.of(splits[0], splits[1])); +* } +* } +* +* public TypeInformation getResultType(Class[] signature) { +* return Types.ROW(Types.STRING, Types.STRING); +* } +* } +* +* TableFunction func = new MyFlatMapFunction(); +* tableEnv.registerFunction("func", func); +* table.flatMap("func(c)").as("a, b"); +* }}} +*/ + def flatMap(tableFunction: Expression): Table = { +unwrap(tableFunction, tableEnv) match { + case _: TableFunctionCall => + case _ => throw new ValidationException("Only TableFunction can be used in flatMap.") +} + +// rename output fields names to avoid ambiguous name +val originalCall = UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction) +val originalOutputFieldNames = originalCall.output.map(_.name) +val usedFieldNames: mutable.HashSet[String] = mutable.HashSet() +logicalPlan.output.map(_.name).foreach(usedFieldNames.add) Review comment: Good suggestion, done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11082) Increase backlog only if it is available for consumption
zhijiang created FLINK-11082: Summary: Increase backlog only if it is available for consumption Key: FLINK-11082 URL: https://issues.apache.org/jira/browse/FLINK-11082 Project: Flink Issue Type: Sub-task Components: Network Affects Versions: 1.8.0 Reporter: zhijiang Assignee: zhijiang The backlog should indicate how many buffers are available in subpartition for downstream's consumption. The availability is considered from two factors. One is {{BufferConsumer}} finished, and the other is flush triggered. In current implementation, when the {{BufferConsumer}} is added into the subpartition, then the backlog is increased as a result, but this {{BufferConsumer}} is not yet available for network transport. Furthermore, the backlog would affect requesting floating buffers on downstream side. That means some floating buffers are fetched in advance but not be used for long time, so the floating buffers are not made use of efficiently. We found this scenario extremely for rebalance selector on upstream side, so we want to change when to increase backlog by finishing {{BufferConsumer}} or flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10941) Slots prematurely released which still contain unconsumed data
[ https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710927#comment-16710927 ] ASF GitHub Bot commented on FLINK-10941: QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#discussion_r239314973 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ## @@ -902,6 +902,15 @@ private void checkTaskManagerTimeouts() { // first retrieve the timed out TaskManagers for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) { if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) { + // checking whether TaskManagers can be safely removed Review comment: Yes, your proposal will improve resource utilization. But considering that TM may be reused after all partitions are consumed (and before TM timeout), we may discuss this optimization in another issue. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Slots prematurely released which still contain unconsumed data > --- > > Key: FLINK-10941 > URL: https://issues.apache.org/jira/browse/FLINK-10941 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Qi >Assignee: Qi >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 > parallelism to write data sink. > > The read task worked perfectly with 32 TMs. However when the job was > executing the write task, since only 4 TMs were needed, other 28 TMs were > released. This caused RemoteTransportException in the write task: > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager > ’the_previous_TM_used_by_read_task'. This might indicate that the remote task > manager was lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > ... > > After skimming YarnFlinkResourceManager related code, it seems to me that > Flink is releasing TMs when they’re idle, regardless of whether working TMs > need them. > > Put in another way, Flink seems to prematurely release slots which contain > unconsumed data and, thus, eventually release a TM which then fails a > consuming task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions
QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#discussion_r239314973 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ## @@ -902,6 +902,15 @@ private void checkTaskManagerTimeouts() { // first retrieve the timed out TaskManagers for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) { if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) { + // checking whether TaskManagers can be safely removed Review comment: Yes, your proposal will improve resource utilization. But considering that TM may be reused after all partitions are consumed (and before TM timeout), we may discuss this optimization in another issue. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10941) Slots prematurely released which still contain unconsumed data
[ https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710920#comment-16710920 ] ASF GitHub Bot commented on FLINK-10941: QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#discussion_r239314285 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -134,10 +138,17 @@ public void cancel(InputChannelID receiverId) { ctx.pipeline().fireUserEventTriggered(receiverId); } - public void close() { + public void close() throws IOException { if (ctx != null) { ctx.channel().close(); } + + LOG.info("Close all {} readers pending for close.", readersToClose.size()); Review comment: Thanks Zhijiang. Agree with you that TM should only exit after all connections are closed. Regarding the side effect that 1 tail tasks out of 10 tasks will delay releasing partitions, I think this may not be bad since the resource is managed at TM level. If a TM cannot be released, it doesn't make too much difference whether it has 1 partition or 10 partitions. Please correct me if anything wrong. Considering all existing limitations, I think the resource management could be further improved via external shuffle service that you've proposed. This PR may focus on solving current issue. Looking forward to your further comments. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Slots prematurely released which still contain unconsumed data > --- > > Key: FLINK-10941 > URL: https://issues.apache.org/jira/browse/FLINK-10941 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Qi >Assignee: Qi >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 > parallelism to write data sink. > > The read task worked perfectly with 32 TMs. However when the job was > executing the write task, since only 4 TMs were needed, other 28 TMs were > released. This caused RemoteTransportException in the write task: > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager > ’the_previous_TM_used_by_read_task'. This might indicate that the remote task > manager was lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > ... > > After skimming YarnFlinkResourceManager related code, it seems to me that > Flink is releasing TMs when they’re idle, regardless of whether working TMs > need them. > > Put in another way, Flink seems to prematurely release slots which contain > unconsumed data and, thus, eventually release a TM which then fails a > consuming task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions
QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#discussion_r239314285 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -134,10 +138,17 @@ public void cancel(InputChannelID receiverId) { ctx.pipeline().fireUserEventTriggered(receiverId); } - public void close() { + public void close() throws IOException { if (ctx != null) { ctx.channel().close(); } + + LOG.info("Close all {} readers pending for close.", readersToClose.size()); Review comment: Thanks Zhijiang. Agree with you that TM should only exit after all connections are closed. Regarding the side effect that 1 tail tasks out of 10 tasks will delay releasing partitions, I think this may not be bad since the resource is managed at TM level. If a TM cannot be released, it doesn't make too much difference whether it has 1 partition or 10 partitions. Please correct me if anything wrong. Considering all existing limitations, I think the resource management could be further improved via external shuffle service that you've proposed. This PR may focus on solving current issue. Looking forward to your further comments. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10560) JVM_ARGS doesn't work in jobmanager and taskmangers' JVM on yarn
[ https://issues.apache.org/jira/browse/FLINK-10560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710916#comment-16710916 ] Jiayi Liao commented on FLINK-10560: [~till.rohrmann] Thanks for pointing it out. It will be better if we can point it out in the comments in bin/config.sh. > JVM_ARGS doesn't work in jobmanager and taskmangers' JVM on yarn > > > Key: FLINK-10560 > URL: https://issues.apache.org/jira/browse/FLINK-10560 > Project: Flink > Issue Type: Bug >Affects Versions: 1.6.0 >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.
[ https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710911#comment-16710911 ] ASF GitHub Bot commented on FLINK-10884: wg1026688210 commented on a change in pull request #7185: [FLINK-10884] [yarn/mesos] adjust container memory param to set a safe margin from offheap memory URL: https://github.com/apache/flink/pull/7185#discussion_r239310963 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ## @@ -158,8 +158,10 @@ public static ContaineredTaskManagerParameters create( // (2) split the remaining Java memory between heap and off-heap final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config); - // use the cut-off memory for off-heap (that was its intention) - final long offHeapSizeMB = containerMemoryMB - heapSizeMB; + // (3) try to compute the offHeapMemory from a safe margin + final long restMemoryMB = containerMemoryMB - heapSizeMB; + final long offHeapCutoffMemory = calculateOffHeapCutoffMB(config, restMemoryMB); Review comment: maybe we need some suggestions from @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink on yarn TM container will be killed by nodemanager because of the > exceeded physical memory. > > > Key: FLINK-10884 > URL: https://issues.apache.org/jira/browse/FLINK-10884 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Core >Affects Versions: 1.5.5, 1.6.2, 1.7.0 > Environment: version : 1.6.2 > module : flink on yarn > centos jdk1.8 > hadoop 2.7 >Reporter: wgcn >Assignee: wgcn >Priority: Major > Labels: pull-request-available, yarn > > TM container will be killed by nodemanager because of the exceeded > [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi] > memory. I found the lanuch context lanuching TM container that > "container memory = heap memory+ offHeapSizeMB" at the class > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters > from line 160 to 166 I set a safety margin for the whole memory container > using. For example if the container limit 3g memory, the sum memory that > "heap memory+ offHeapSizeMB" is equal to 2.4g to prevent the container > being killed.Do we have the > [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC] > solution or I can commit my solution -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] wg1026688210 commented on a change in pull request #7185: [FLINK-10884] [yarn/mesos] adjust container memory param to set a safe margin from offheap memory
wg1026688210 commented on a change in pull request #7185: [FLINK-10884] [yarn/mesos] adjust container memory param to set a safe margin from offheap memory URL: https://github.com/apache/flink/pull/7185#discussion_r239310963 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ## @@ -158,8 +158,10 @@ public static ContaineredTaskManagerParameters create( // (2) split the remaining Java memory between heap and off-heap final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config); - // use the cut-off memory for off-heap (that was its intention) - final long offHeapSizeMB = containerMemoryMB - heapSizeMB; + // (3) try to compute the offHeapMemory from a safe margin + final long restMemoryMB = containerMemoryMB - heapSizeMB; + final long offHeapCutoffMemory = calculateOffHeapCutoffMB(config, restMemoryMB); Review comment: maybe we need some suggestions from @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9555) Support table api in scala shell
[ https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710906#comment-16710906 ] ASF GitHub Bot commented on FLINK-9555: --- sunjincheng121 commented on issue #7121: [FLINK-9555]Support table api in scala shell URL: https://github.com/apache/flink/pull/7121#issuecomment-444726128 Thanks for the update! @shuiqiangchen Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support table api in scala shell > > > Key: FLINK-9555 > URL: https://issues.apache.org/jira/browse/FLINK-9555 > Project: Flink > Issue Type: New Feature > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Assignee: shuiqiangchen >Priority: Major > Labels: pull-request-available > > It would be nice to have table api available in scala shell so that user can > experience table api in interactive way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on issue #7121: [FLINK-9555]Support table api in scala shell
sunjincheng121 commented on issue #7121: [FLINK-9555]Support table api in scala shell URL: https://github.com/apache/flink/pull/7121#issuecomment-444726128 Thanks for the update! @shuiqiangchen Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710899#comment-16710899 ] ASF GitHub Bot commented on FLINK-11010: lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444723623 In flink, I think it is better to avoid timezone in calculation ,because the input is always UTC This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.2 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444723623 In flink, I think it is better to avoid timezone in calculation ,because the input is always UTC This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11065) Migrate flink-table runtime classes
[ https://issues.apache.org/jira/browse/FLINK-11065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710889#comment-16710889 ] Hequn Cheng commented on FLINK-11065: - [~twalthr] Hi, thanks for driving this. I think we would benefit a lot from it once we finished. Considering how to create smaller subtasks, here are my thoughts:[~twalthr] [~xueyu] Probably we can migrate runtime classes folder by folder. There are some reasons: - Different operators have already been split by folders, such as aggregations, join and match. - The relevance of files between folders is relatively small. - Each time we only have to focus on files in a single folder, so that will be easier to develop and review. As for each folder, we can also create smaller tasks. For example, we can separate window join and non-window join into different tasks. We can start our work from the root folder. Probably with the following three tasks: - Port runner classes({{*Runner}}) - Port function classes({{*Function}}) - Port other classes, such as {{CRowKeySelector}}, {{RowKeySelector}} and {{TableFunctionCollector}} What do you guys think? > Migrate flink-table runtime classes > --- > > Key: FLINK-11065 > URL: https://issues.apache.org/jira/browse/FLINK-11065 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Priority: Major > > This issue covers the third step of the migration plan mentioned in > [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free]. > All runtime classes have little dependencies to other classes. > This issue tracks efforts of porting runtime classes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10783) Update WindowOperatorMigrationTest for 1.7
[ https://issues.apache.org/jira/browse/FLINK-10783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] boshu Zheng reassigned FLINK-10783: --- Assignee: boshu Zheng (was: vinoyang) > Update WindowOperatorMigrationTest for 1.7 > -- > > Key: FLINK-10783 > URL: https://issues.apache.org/jira/browse/FLINK-10783 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: boshu Zheng >Priority: Major > > Update {{WindowOperatorMigrationTest}} so that it covers restoring from 1.7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10640) Enable Slot Resource Profile for Resource Management
[ https://issues.apache.org/jira/browse/FLINK-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710884#comment-16710884 ] Tony Xintong Song commented on FLINK-10640: --- Hi [~Tison], what you proposed is exactly what I've been thinking. I think it's actually an important part of the TM management. If we are going to manage dynamic number of {{TaskExecutor}} s, I think we do need such a pair of (min, max) to allow users limiting the range of {{TaskExecutor}} amount. > Enable Slot Resource Profile for Resource Management > > > Key: FLINK-10640 > URL: https://issues.apache.org/jira/browse/FLINK-10640 > Project: Flink > Issue Type: New Feature > Components: ResourceManager >Reporter: Tony Xintong Song >Priority: Major > > Motivation & Backgrounds > * The existing concept of task slots roughly represents how many pipeline of > tasks a TaskManager can hold. However, it does not consider the differences > in resource needs and usage of individual tasks. Enabling resource profiles > of slots may allow Flink to better allocate execution resources according to > tasks fine-grained resource needs. > * The community version Flink already contains APIs and some implementation > for slot resource profile. However, such logic is not truly used. > (ResourceProfile of slot requests is by default set to UNKNOWN with negative > values, thus matches any given slot.) > Preliminary Design > * Slot Management > A slot represents a certain amount of resources for a single pipeline of > tasks to run in on a TaskManager. Initially, a TaskManager does not have any > slots but a total amount of resources. When allocating, the ResourceManager > finds proper TMs to generate new slots for the tasks to run according to the > slot requests. Once generated, the slot's size (resource profile) does not > change until it's freed. ResourceManager can apply different, portable > strategies to allocate slots from TaskManagers. > * TM Management > The size and number of TaskManagers and when to start them can also be > flexible. TMs can be started and released dynamically, and may have different > sizes. We may have many different, portable strategies. E.g., an elastic > session that can run multiple jobs like the session mode while dynamically > adjusting the size of session (number of TMs) according to the realtime > working load. > * About Slot Sharing > Slot sharing is a good heuristic to easily calculate how many slots needed > to get the job running and get better utilization when there is no resource > profile in slots. However, with resource profiles enabling finer-grained > resource management, each individual task has its specific resource need and > it does not make much sense to have multiple tasks sharing the resource of > the same slot. Instead, we may introduce locality preferences/constraints to > support the semantics of putting tasks in same/different TMs in a more > general way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11081: Assignee: vinoyang > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710867#comment-16710867 ] ASF GitHub Bot commented on FLINK-10543: sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#discussion_r239301372 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala ## @@ -56,6 +56,10 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream val cleanupTime = currentTime + maxRetentionTime // register timer and remember clean-up time ctx.timerService().registerProcessingTimeTimer(cleanupTime) +// delete expired timer +if (curCleanupTime != null) { + ctx.timerService().deleteProcessingTimeTimer(curCleanupTime) +} Review comment: We can remove `needToCleanupState ` after add delete logic. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710866#comment-16710866 ] ASF GitHub Bot commented on FLINK-10543: sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#discussion_r239300038 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CoProcessFunctionWithCleanupState.scala ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state.{State, ValueState, ValueStateDescriptor} +import org.apache.flink.streaming.api.TimeDomain +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} + +abstract class CoProcessFunctionWithCleanupState[IN1, IN2, OUT](queryConfig: StreamQueryConfig) + extends CoProcessFunction[IN1, IN2, OUT] + with CleanupState { + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected def initCleanupTimeState(stateName: String) { +if (stateCleaningEnabled) { + val cleanupTimeDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor) +} + } + + protected def registerProcessingCleanupTimer( Review comment: `registerProcessingCleanupTimer` -> `processCleanupTimer` because we not always really register the time. e.g.: when `stateCleaningEnabled is false`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710868#comment-16710868 ] ASF GitHub Bot commented on FLINK-10543: sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#discussion_r239298808 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CleanupState.scala ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import java.lang.{Long => JLong} + +import org.apache.flink.streaming.api.TimerService + +/** + * Base class for clean up state, both for [[ProcessFunction]] and [[CoProcessFunction]]. + */ +trait CleanupState { Review comment: I think it again, using queryConfig maybe less code reuse. so we can keep `CoProcessFunctionWithCleanupState` and `ProcessFunctionWithCleanupState` for `coProcessFunction and `processFunction` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators
sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#discussion_r239298808 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CleanupState.scala ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import java.lang.{Long => JLong} + +import org.apache.flink.streaming.api.TimerService + +/** + * Base class for clean up state, both for [[ProcessFunction]] and [[CoProcessFunction]]. + */ +trait CleanupState { Review comment: I think it again, using queryConfig maybe less code reuse. so we can keep `CoProcessFunctionWithCleanupState` and `ProcessFunctionWithCleanupState` for `coProcessFunction and `processFunction` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators
sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#discussion_r239301372 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala ## @@ -56,6 +56,10 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream val cleanupTime = currentTime + maxRetentionTime // register timer and remember clean-up time ctx.timerService().registerProcessingTimeTimer(cleanupTime) +// delete expired timer +if (curCleanupTime != null) { + ctx.timerService().deleteProcessingTimeTimer(curCleanupTime) +} Review comment: We can remove `needToCleanupState ` after add delete logic. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators
sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918#discussion_r239300038 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CoProcessFunctionWithCleanupState.scala ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state.{State, ValueState, ValueStateDescriptor} +import org.apache.flink.streaming.api.TimeDomain +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} + +abstract class CoProcessFunctionWithCleanupState[IN1, IN2, OUT](queryConfig: StreamQueryConfig) + extends CoProcessFunction[IN1, IN2, OUT] + with CleanupState { + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected def initCleanupTimeState(stateName: String) { +if (stateCleaningEnabled) { + val cleanupTimeDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor) +} + } + + protected def registerProcessingCleanupTimer( Review comment: `registerProcessingCleanupTimer` -> `processCleanupTimer` because we not always really register the time. e.g.: when `stateCleaningEnabled is false`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710822#comment-16710822 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239271764 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, UnresolvedFieldReference, WindowProperty} +import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, TableFunctionCall, UnresolvedAlias, UnresolvedFieldReference, WindowProperty} Review comment: more than 10 class import, i suggestion using `._`, What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710830#comment-16710830 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239281388 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -79,7 +80,8 @@ class Table( * @param udtfCall A String expression of the TableFunction call. */ def this(tableEnv: TableEnvironment, udtfCall: String) { -this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, udtfCall)) +this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall( + tableEnv, ExpressionParser.parseExpression(udtfCall))) Review comment: Format code as follows: ? ``` this( tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall( tableEnv, ExpressionParser.parseExpression(udtfCall) ) ) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710824#comment-16710824 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239279239 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -320,6 +320,18 @@ case class TableFunctionCall( override private[flink] def children: Seq[Expression] = parameters + def as(fields: Symbol*): TableFunctionCall = { Review comment: Add java doc: /** * Assigns an alias for this table function's returned fields * * @param fields alias for this table function's returned fields * @return this table function call */ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710828#comment-16710828 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239271885 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() Review comment: `MyFlatMapFunction()`->`MyFlatMapFunction` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710823#comment-16710823 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239273278 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -320,6 +320,18 @@ case class TableFunctionCall( override private[flink] def children: Seq[Expression] = parameters + def as(fields: Symbol*): TableFunctionCall = { +this.aliases = Some(fields.map(_.name)) +this + } + + def as(fields: String): TableFunctionCall = { +val fieldExprs = ExpressionParser Review comment: For Robustness I suggest add empty check, something like `if(fields.isEmpty) return this`, What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710831#comment-16710831 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239295918 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() +* table.flatMap(func('c)).as('a, 'b) +* }}} +* +* Java Example: +* {{{ +* class MyFlatMapFunction extends TableFunction { +* public void eval(String str) { +* if (str.contains("#")) { +* String[] splits = str.split("#"); +* collect(Row.of(splits[0], splits[1])); +* } +* } +* +* public TypeInformation getResultType(Class[] signature) { +* return Types.ROW(Types.STRING, Types.STRING); +* } +* } +* +* TableFunction func = new MyFlatMapFunction(); +* tableEnv.registerFunction("func", func); +* table.flatMap("func(c)").as("a, b"); +* }}} +*/ + def flatMap(tableFunction: Expression): Table = { +unwrap(tableFunction, tableEnv) match { + case _: TableFunctionCall => + case _ => throw new ValidationException("Only TableFunction can be used in flatMap.") +} + +// rename output fields names to avoid ambiguous name +val originalCall = UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction) +val originalOutputFieldNames = originalCall.output.map(_.name) +val usedFieldNames: mutable.HashSet[String] = mutable.HashSet() +logicalPlan.output.map(_.name).foreach(usedFieldNames.add) + +var i: Int = 0 +def findNewName(n: String): String = { + val newName = n + "_" + i + i += 1 + if (usedFieldNames.contains(newName)) { +findNewName(n) + } else { +usedFieldNames.add(newName) +newName + } +} + +val newOutputFieldNames = originalOutputFieldNames.map(n => + if (usedFieldNames.contains(n)) { +findNewName(n) + } else { +n + } +) Review comment: The logic of gen the new output field name can be changed as follows: ``` val newOutputFieldNames = originalOutputFieldNames.zipWithIndex.map( { case (name, index) => if(usedFieldNames.contains(name)){ index + "_" + name }else { name } }).toArray.sorted ``` What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710825#comment-16710825 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239291310 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() +* table.flatMap(func('c)).as('a, 'b) +* }}} +* +* Java Example: +* {{{ +* class MyFlatMapFunction extends TableFunction { +* public void eval(String str) { +* if (str.contains("#")) { +* String[] splits = str.split("#"); +* collect(Row.of(splits[0], splits[1])); +* } +* } +* +* public TypeInformation getResultType(Class[] signature) { +* return Types.ROW(Types.STRING, Types.STRING); +* } +* } +* +* TableFunction func = new MyFlatMapFunction(); +* tableEnv.registerFunction("func", func); +* table.flatMap("func(c)").as("a, b"); +* }}} +*/ + def flatMap(tableFunction: Expression): Table = { +unwrap(tableFunction, tableEnv) match { + case _: TableFunctionCall => + case _ => throw new ValidationException("Only TableFunction can be used in flatMap.") +} + +// rename output fields names to avoid ambiguous name +val originalCall = UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction) +val originalOutputFieldNames = originalCall.output.map(_.name) +val usedFieldNames: mutable.HashSet[String] = mutable.HashSet() +logicalPlan.output.map(_.name).foreach(usedFieldNames.add) Review comment: `logicalPlan.output.map(_.name).foreach(usedFieldNames.add)` -> `logicalPlan.output.foreach(a => usedFieldNames.add(a.name))` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710821#comment-16710821 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239273030 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ## @@ -328,7 +340,9 @@ case class TableFunctionCall( * @return this table function call */ private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = { -this.aliases = aliasList +if (aliasList.isDefined) { Review comment: After add the `as(fields: String)`, I think can remove this method. And then change the `UserDefinedFunctionUtils#`createLogicalFunctionCall` usage as following comment in `createLogicalFunctionCall`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239271885 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +1001,131 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a flatMap operation with an user-defined table function. +* +* Scala Example: +* {{{ +* class MyFlatMapFunction extends TableFunction[Row] { +* def eval(str : String) { +* if (str.contains("#")) { +* val splits = str.split("#") +* collect(Row.of(splits(0), splits(1))) +* } +* } +* +* def getResultType(signature: Array[Class[_]]): TypeInformation[_] = +* Types.ROW(Types.STRING, Types.STRING) +* } +* +* val func = new MyFlatMapFunction() Review comment: `MyFlatMapFunction()`->`MyFlatMapFunction` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239285343 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/FlatMapITCase.scala ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase +import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.utils.TableFunc2 +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf[Parameterized]) +class FlatMapITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { + + private def testData(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = { +val data = new mutable.MutableList[(Int, Long, String)] +data.+=((1, 1L, "Jack#22")) +data.+=((2, 2L, "John#333")) +data.+=((3, 2L, "Anna#")) +data.+=((4, 3L, "nosharp#5")) +env.fromCollection(data) + } + + @Test + def testFlatMap(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val func2 = new TableFunc2 +val results = testData(env).toTable(tEnv, 'a, 'b, 'c) + .flatMap(func2('c)) Review comment: Suggestions to enhance our test case as follows: (I'm fine if you want add the test in validate test not in it case) val ds = testData(env).toTable(tEnv, 'a, 'b, 'c) .flatMap(func2('c)) // test non alias .select('f0, 'f1) .flatMap(func2("Sunny#Panpan") as('a, 'b)) // test alias .select('a, 'b) // test function output name same as left table field name .flatMap(func2("Dian#Flink").as('a, 'b)) .select('a, 'b) What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710827#comment-16710827 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239285343 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/FlatMapITCase.scala ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase +import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.utils.TableFunc2 +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf[Parameterized]) +class FlatMapITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { + + private def testData(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = { +val data = new mutable.MutableList[(Int, Long, String)] +data.+=((1, 1L, "Jack#22")) +data.+=((2, 2L, "John#333")) +data.+=((3, 2L, "Anna#")) +data.+=((4, 3L, "nosharp#5")) +env.fromCollection(data) + } + + @Test + def testFlatMap(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val func2 = new TableFunc2 +val results = testData(env).toTable(tEnv, 'a, 'b, 'c) + .flatMap(func2('c)) Review comment: Suggestions to enhance our test case as follows: (I'm fine if you want add the test in validate test not in it case) val ds = testData(env).toTable(tEnv, 'a, 'b, 'c) .flatMap(func2('c)) // test non alias .select('f0, 'f1) .flatMap(func2("Sunny#Panpan") as('a, 'b)) // test alias .select('a, 'b) // test function output name same as left table field name .flatMap(func2("Dian#Flink").as('a, 'b)) .select('a, 'b) What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710829#comment-16710829 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239285940 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/FlatMapValidationTest.scala ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.Func15 +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg +import org.apache.flink.table.utils.{TableFunc0, TableTestBase} +import org.junit.Test + +class FlatMapValidationTest extends TableTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidMapFunctionTypeAggregation(): Unit = { +val util = streamTestUtil() +util.addTable[(Int)]( + "MyTable", 'int) + .flatMap('int.sum) // do not support AggregateFunction as input Review comment: add `.flatMap('int) ` test case ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710826#comment-16710826 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#discussion_r239273803 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -795,7 +795,7 @@ object UserDefinedFunctionUtils { "define table function followed by some Alias.") } -val functionCall: LogicalTableFunctionCall = unwrap(ExpressionParser.parseExpression(udtf)) +val functionCall: LogicalTableFunctionCall = unwrap(udtfExpr) Review comment: If we remove `as(Option(Seq[String]))`, we should change the usage looks like as follows: ``` unwrap(udtfExpr).as(alias.getOrElse(Seq()).mkString(",")).toLogicalTableFunctionCall(child = null) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)