[jira] [Assigned] (FLINK-10929) Add support for Apache Arrow

2018-12-05 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10929:


Assignee: (was: vinoyang)

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>Reporter: Pedro Cardoso Silva
>Priority: Minor
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9461) Disentangle flink-connector-kafka from flink-table and flink-json

2018-12-05 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711081#comment-16711081
 ] 

vinoyang commented on FLINK-9461:
-

[~twalthr] Some interfaces, such as StreamTableSinkFactory, are used by 
multiple connectors. So while migrating, I probably won't delete them, but 
instead create a copy of the Java implementation in table-common. I have done 
[most of the 
work|https://github.com/yanghua/flink/commit/03e69684d7f584618fb5db04be13c63a403d6d12]
 right now, but the transitive dependencies of some interfaces make it 
impossible to compile successfully and need to rethink.
In addition, I suggest that we create an umbrella issue for migrating all 
connectors and then use the current issue as a sub task.
The migration of connectors and the transformation of interfaces should be 
centralized.
What do you think?

> Disentangle flink-connector-kafka from flink-table and flink-json
> -
>
> Key: FLINK-9461
> URL: https://issues.apache.org/jira/browse/FLINK-9461
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.8.0
>
>
> Currently, the {{flink-connector-kafka}} module has a dependency on 
> {{flink-table}} and {{flink-json}}. The reason seems to be that the module 
> contains the {{KafkaJsonTableSource}} and {{KafkaJsonTableSink}}. Even though 
> the {{flink-table}} and {{flink-json}} dependency are marked as optional, the 
> {{flink-connector-kafka}} will still contain the table sources and sinks. I 
> think this is not a clean design.
> I would propose to move the table sources and sinks into a dedicated module 
> which depends on {{flink-connector-kafka}}. That way we would better separate 
> dependencies and could remove {{flink-table}} and {{flink-json}} from 
> {{flink-connector-kafka}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711073#comment-16711073
 ] 

ASF GitHub Bot commented on FLINK-11010:


lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `   
if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   
   if (windowEndOffset.isDefined) {
 output.setField(
   lastFieldPos + windowEndOffset.get,
   SqlFunctions.internalToTimestamp(windowEnd))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be written to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxm

[GitHub] lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread GitBox
lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `   
if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   
   if (windowEndOffset.isDefined) {
 output.setField(
   lastFieldPos + windowEndOffset.get,
   SqlFunctions.internalToTimestamp(windowEnd))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be written to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue
 (ObjectMapper.java:2,444)
 [6] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree
 (ObjectMapper.java:2,586)
 [7] org.apache.flink.formats.json.JsonRowSerializationSchem

[GitHub] lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread GitBox
lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `   
if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   
   if (windowEndOffset.isDefined) {
 output.setField(
   lastFieldPos + windowEndOffset.get,
   SqlFunctions.internalToTimestamp(windowEnd))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be written to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue
 (ObjectMapper.java:2,444)
 [6] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree
 (ObjectMapper.java:2,586)
 [7] org.apache.flink.formats.json.JsonRowSerializationSch

[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711071#comment-16711071
 ] 

ASF GitHub Bot commented on FLINK-11010:


lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `   
if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   
   if (windowEndOffset.isDefined) {
 output.setField(
   lastFieldPos + windowEndOffset.get,
   SqlFunctions.internalToTimestamp(windowEnd))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be written to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.faster

[jira] [Commented] (FLINK-8033) Build Flink with JDK 9

2018-12-05 Thread Takanobu Asanuma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711064#comment-16711064
 ] 

Takanobu Asanuma commented on FLINK-8033:
-

FYI, Hadoop stopped fixing the incompatibilities of JDK 9/10 as they are EOL, 
and focuses on supporting JDK 8/11. Please see HADOOP-11123.

> Build Flink with JDK 9
> --
>
> Key: FLINK-8033
> URL: https://issues.apache.org/jira/browse/FLINK-8033
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> This is a JIRA to track all issues that found to make Flink compatible with 
> Java 9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711063#comment-16711063
 ] 

ASF GitHub Bot commented on FLINK-11010:


lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444773602
 
 
   @lzqdename, thanks for your detail description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread GitBox
lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444773602
 
 
   @lzqdename, thanks for your detail description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11084) Incorrect ouput after two consecutive split and select

2018-12-05 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-11084:

Summary: Incorrect ouput after two consecutive split and select  (was: 
Incorrect ouput after two successive split and select)

> Incorrect ouput after two consecutive split and select
> --
>
> Key: FLINK-11084
> URL: https://issues.apache.org/jira/browse/FLINK-11084
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The second OutputSelector of two successive split and select are actually not 
> rely on the first one. They are in the same array of OutputSelector in 
> DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = 
> dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11084) Incorrect ouput after two successive split and select

2018-12-05 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711041#comment-16711041
 ] 

Shimin Yang commented on FLINK-11084:
-

Currently I am creating a OutputSelectorWrapper to hold the current 
OutputSelector and all successive parent OutputSelectors and selectNames during 
the StreamGraph generation. And in the runtime every record will go through all 
the OutputSelectors in the right order.

> Incorrect ouput after two successive split and select
> -
>
> Key: FLINK-11084
> URL: https://issues.apache.org/jira/browse/FLINK-11084
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The second OutputSelector of two successive split and select are actually not 
> rely on the first one. They are in the same array of OutputSelector in 
> DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = 
> dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11084) Incorrect ouput after two successive split and select

2018-12-05 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-11084:
---

 Summary: Incorrect ouput after two successive split and select
 Key: FLINK-11084
 URL: https://issues.apache.org/jira/browse/FLINK-11084
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Shimin Yang
Assignee: Shimin Yang
 Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1


The second OutputSelector of two successive split and select are actually not 
rely on the first one. They are in the same array of OutputSelector in 
DirectedOutput.

For example.

outputSelector1 => \{“name1” or ”name2“}

outputSelector2 => \{”name3“ or “name4”}

resultStream = 
dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")

expectedResult for input \{StreamRecord1}:

outputSelector1 => \{”name1“}

outputSelector2 => \{”name3“}

resultStream => {}

actualResult:

resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711033#comment-16711033
 ] 

ASF GitHub Bot commented on FLINK-11010:


lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   if (windowEndOffset.isDefined) {
 output.setField(
   lastFieldPos + windowEndOffset.get,
   SqlFunctions.internalToTimestamp(windowEnd))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be write to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackso

[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711035#comment-16711035
 ] 

ASF GitHub Bot commented on FLINK-11010:


lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `   
if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   
   if (windowEndOffset.isDefined) {
 output.setField(
   lastFieldPos + windowEndOffset.get,
   SqlFunctions.internalToTimestamp(windowEnd))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be write to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxm

[GitHub] lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread GitBox
lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `   
if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   
   if (windowEndOffset.isDefined) {
 output.setField(
   lastFieldPos + windowEndOffset.get,
   SqlFunctions.internalToTimestamp(windowEnd))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be write to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue
 (ObjectMapper.java:2,444)
 [6] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree
 (ObjectMapper.java:2,586)
 [7] org.apache.flink.formats.json.JsonRowSerializationSchem

[GitHub] lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread GitBox
lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   if (windowEndOffset.isDefined) {
 output.setField(
   lastFieldPos + windowEndOffset.get,
   SqlFunctions.internalToTimestamp(windowEnd))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be write to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue
 (ObjectMapper.java:2,444)
 [6] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree
 (ObjectMapper.java:2,586)
 [7] org.apache.flink.formats.json.JsonRowSerializationSchema.conver

[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711023#comment-16711023
 ] 

ASF GitHub Bot commented on FLINK-11010:


lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be write to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue
 (ObjectMapper.java:2,444)
 [6] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree
 (ObjectMapper.java:

[GitHub] lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread GitBox
lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
 [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
 [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
 [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
 [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
 [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
 [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
 [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
 [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
 [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
 [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
 [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
 [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
 [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
 [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
 [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
 [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
 [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
 {
   return new Timestamp(v - LOCAL_TZ.getOffset(v));
 }
   `
   let us print the value of windowStart:v 
   print v
v = 154407483
   let us print the value of windowEnd:v 
   print v
v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `if (windowStartOffset.isDefined) {
 output.setField(
   lastFieldPos + windowStartOffset.get,
   SqlFunctions.internalToTimestamp(windowStart))
   }
   `
   
   before execute,the output is 
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 154407483  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be write to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
 [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
 [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
 [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
 [5] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue
 (ObjectMapper.java:2,444)
 [6] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree
 (ObjectMapper.java:2,586)
 [7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert 
(JsonRowSerializationSchema.java:189)
 [8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow 
(JsonRowSerializationSchema.java:128)
 [9] org.apache.flin

[jira] [Commented] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable

2018-12-05 Thread boshu Zheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711016#comment-16711016
 ] 

boshu Zheng commented on FLINK-11083:
-

cc [~pnowojski] Could you please have a look?

> CRowSerializerConfigSnapshot is not instantiable
> 
>
> Key: FLINK-11083
> URL: https://issues.apache.org/jira/browse/FLINK-11083
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL, Type Serialization System
>Reporter: boshu Zheng
>Assignee: boshu Zheng
>Priority: Major
>
> An exception was encountered when restarting a job with savepoint in our 
> production env,
> {code:java}
> 2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task 
>   :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
> to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>   ... 5 more
> Caused by: java.lang.RuntimeException: The class 
> 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
>  is not instantiable: The class has no (implicit) public nullary constructor, 
> i.e. a constructor without arguments.
>   at 
> org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
>   at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   ... 7 more
> {code}
> I add tests to CRowSerializerTest to make sure this is definitely a bug,
> {code:java}
>   @Test
>   def testDefaultConstructo

[jira] [Created] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable

2018-12-05 Thread boshu Zheng (JIRA)
boshu Zheng created FLINK-11083:
---

 Summary: CRowSerializerConfigSnapshot is not instantiable
 Key: FLINK-11083
 URL: https://issues.apache.org/jira/browse/FLINK-11083
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL, Type Serialization System
Reporter: boshu Zheng
Assignee: boshu Zheng


An exception was encountered when restarting a job with savepoint in our 
production env,

{code:java}
2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task   
:917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
from any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
... 5 more
Caused by: java.lang.RuntimeException: The class 
'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
 is not instantiable: The class has no (implicit) public nullary constructor, 
i.e. a constructor without arguments.
at 
org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
at 
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
at 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more
{code}

I add tests to CRowSerializerTest to make sure this is definitely a bug,
{code:java}
  @Test
  def testDefaultConstructor(): Unit = {
new CRowSerializer.CRowSerializerConfigSnapshot()

/// This would fail the test
val serializerConfigSnapshotClass =
 
Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
InstantiationUtil.instantiate(serializerConfigSnapshotClass)
  }

  @Test
  def testStateRestore(): Unit = {

class IKeyedPr

[jira] [Updated] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11048:
---
Labels: pull-request-available  (was: )

> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711005#comment-16711005
 ] 

ASF GitHub Bot commented on FLINK-11048:


tweise opened a new pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249
 
 
   ## What is the purpose of the change
   
   Adds the ability to programmatically execute a job with savepoint restore 
option.
   
   
https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E
   
   The number of parameters to *executeRemotely* is a bit higher than what we 
might have expected. The change to ScalaShellRemoteStreamEnvironment needs a 
closer look. It needs to override the jar file list, but do we really need the 
*ProgramInvocationException* there?
   
   ## Brief change log
 -  executeRemotely changed to static method that can be used with any 
StreamExecutionEnvironment
   
   ## Verifying this change
   
   No added test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise opened a new pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-05 Thread GitBox
tweise opened a new pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249
 
 
   ## What is the purpose of the change
   
   Adds the ability to programmatically execute a job with savepoint restore 
option.
   
   
https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E
   
   The number of parameters to *executeRemotely* is a bit higher than what we 
might have expected. The change to ScalaShellRemoteStreamEnvironment needs a 
closer look. It needs to override the jar file list, but do we really need the 
*ProgramInvocationException* there?
   
   ## Brief change log
 -  executeRemotely changed to static method that can be used with any 
StreamExecutionEnvironment
   
   ## Verifying this change
   
   No added test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710996#comment-16710996
 ] 

ASF GitHub Bot commented on FLINK-11010:


lzqdename removed a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444723623
 
 
   In flink, I think it is better to avoid timezone in calculation ,because the 
input is always UTC


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710998#comment-16710998
 ] 

ASF GitHub Bot commented on FLINK-10543:


sunjincheng121 commented on issue #6918: [FLINK-10543][table] Leverage 
efficient timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#issuecomment-444757050
 
 
   Thanks for the quickly update! @hequn8128 
   +1 to merged. 
   Thanks, Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on issue #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators

2018-12-05 Thread GitBox
sunjincheng121 commented on issue #6918: [FLINK-10543][table] Leverage 
efficient timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#issuecomment-444757050
 
 
   Thanks for the quickly update! @hequn8128 
   +1 to merged. 
   Thanks, Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lzqdename removed a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread GitBox
lzqdename removed a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444723623
 
 
   In flink, I think it is better to avoid timezone in calculation ,because the 
input is always UTC


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-9555) Support table api in scala shell

2018-12-05 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-9555.
--
   Resolution: Fixed
Fix Version/s: 1.7.1
   1.8.0

Fixed in master: b3a378ad96088442825f1ad9f167d9571b5f4f78

Fixed in release-1.7: efc73a872ac52e314bb1a05b9c5ed045cde6df1f

> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9555) Support table api in scala shell

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710993#comment-16710993
 ] 

ASF GitHub Bot commented on FLINK-9555:
---

asfgit closed pull request #7121: [FLINK-9555]Support table api in scala shell
URL: https://github.com/apache/flink/pull/7121
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 3c9a563b30f..e7d31355b06 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -78,6 +78,13 @@ under the License.
${scala.version}

 
+   
+   org.apache.flink
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   provided
+   
+

 

diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index 4b6e886994a..c124d8ea8a3 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -23,6 +23,8 @@ import java.io.{BufferedReader, File, FileOutputStream}
 import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment, 
ScalaShellRemoteStreamEnvironment}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, 
StreamTableEnvironment}
 import org.apache.flink.util.AbstractID
 
 import scala.tools.nsc.interpreter._
@@ -90,10 +92,17 @@ class FlinkILoop(
   }
 
   // local environment
-  val (scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment) 
= {
+  val (
+scalaBenv: ExecutionEnvironment,
+scalaSenv: StreamExecutionEnvironment,
+scalaBTEnv: BatchTableEnvironment,
+scalaSTEnv: StreamTableEnvironment
+) = {
 val scalaBenv = new ExecutionEnvironment(remoteBenv)
 val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
-(scalaBenv,scalaSenv)
+val scalaBTEnv = TableEnvironment.getTableEnvironment(scalaBenv)
+val scalaSTEnv = TableEnvironment.getTableEnvironment(scalaSenv)
+(scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv)
   }
 
   /**
@@ -139,7 +148,10 @@ class FlinkILoop(
 "org.apache.flink.api.scala._",
 "org.apache.flink.api.scala.utils._",
 "org.apache.flink.streaming.api.scala._",
-"org.apache.flink.streaming.api.windowing.time._"
+"org.apache.flink.streaming.api.windowing.time._",
+"org.apache.flink.table.api._",
+"org.apache.flink.table.api.scala._",
+"org.apache.flink.types.Row"
   )
 
   override def createInterpreter(): Unit = {
@@ -152,6 +164,8 @@ class FlinkILoop(
   // set execution environment
   intp.bind("benv", this.scalaBenv)
   intp.bind("senv", this.scalaSenv)
+  intp.bind("btenv", this.scalaBTEnv)
+  intp.bind("stenv", this.scalaSTEnv)
 }
   }
 
@@ -243,22 +257,29 @@ class FlinkILoop(
 
   F L I N K - S C A L A - S H E L L
 
-NOTE: Use the prebound Execution Environments to implement batch or streaming 
programs.
+NOTE: Use the prebound Execution Environments and Table Environment to 
implement batch or streaming programs.
 
-  Batch - Use the 'benv' variable
+  Batch - Use the 'benv' and 'btenv' variable
 
 * val dataSet = benv.readTextFile("/path/to/data")
 * dataSet.writeAsText("/path/to/output")
 * benv.execute("My batch program")
+*
+* val batchTable = btenv.fromDataSet(dataSet)
+* btenv.registerTable("tableName", batchTable)
+* val result = btenv.sqlQuery("SELECT * FROM tableName").collect
+HINT: You can use print() on a DataSet to print the contents or collect()
+a sql query result back to the shell.
 
-HINT: You can use print() on a DataSet to print the contents to the shell.
-
-  Streaming - Use the 'senv' variable
+  Streaming - Use the 'senv' and 'stenv' variable
 
 * val dataStream = senv.fromElements(1, 2, 3, 4)
 * dataStream.countWindowAll(2).sum(0).print()
+*
+* val streamTable = stenv.fromDataStream(dataStream, 'num)
+* val resultTable = streamTable.select('num).where('num % 2 === 1 )
+* resultTable.toAppendStream[Row].print()
 * senv.execute("My streaming program")
-
 HINT: You can only print a DataStream to the shell in local mode.
   """
 // scalastyle:on
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/Sca

[GitHub] asfgit closed pull request #7121: [FLINK-9555]Support table api in scala shell

2018-12-05 Thread GitBox
asfgit closed pull request #7121: [FLINK-9555]Support table api in scala shell
URL: https://github.com/apache/flink/pull/7121
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 3c9a563b30f..e7d31355b06 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -78,6 +78,13 @@ under the License.
${scala.version}

 
+   
+   org.apache.flink
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   provided
+   
+

 

diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index 4b6e886994a..c124d8ea8a3 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -23,6 +23,8 @@ import java.io.{BufferedReader, File, FileOutputStream}
 import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment, 
ScalaShellRemoteStreamEnvironment}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, 
StreamTableEnvironment}
 import org.apache.flink.util.AbstractID
 
 import scala.tools.nsc.interpreter._
@@ -90,10 +92,17 @@ class FlinkILoop(
   }
 
   // local environment
-  val (scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment) 
= {
+  val (
+scalaBenv: ExecutionEnvironment,
+scalaSenv: StreamExecutionEnvironment,
+scalaBTEnv: BatchTableEnvironment,
+scalaSTEnv: StreamTableEnvironment
+) = {
 val scalaBenv = new ExecutionEnvironment(remoteBenv)
 val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
-(scalaBenv,scalaSenv)
+val scalaBTEnv = TableEnvironment.getTableEnvironment(scalaBenv)
+val scalaSTEnv = TableEnvironment.getTableEnvironment(scalaSenv)
+(scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv)
   }
 
   /**
@@ -139,7 +148,10 @@ class FlinkILoop(
 "org.apache.flink.api.scala._",
 "org.apache.flink.api.scala.utils._",
 "org.apache.flink.streaming.api.scala._",
-"org.apache.flink.streaming.api.windowing.time._"
+"org.apache.flink.streaming.api.windowing.time._",
+"org.apache.flink.table.api._",
+"org.apache.flink.table.api.scala._",
+"org.apache.flink.types.Row"
   )
 
   override def createInterpreter(): Unit = {
@@ -152,6 +164,8 @@ class FlinkILoop(
   // set execution environment
   intp.bind("benv", this.scalaBenv)
   intp.bind("senv", this.scalaSenv)
+  intp.bind("btenv", this.scalaBTEnv)
+  intp.bind("stenv", this.scalaSTEnv)
 }
   }
 
@@ -243,22 +257,29 @@ class FlinkILoop(
 
   F L I N K - S C A L A - S H E L L
 
-NOTE: Use the prebound Execution Environments to implement batch or streaming 
programs.
+NOTE: Use the prebound Execution Environments and Table Environment to 
implement batch or streaming programs.
 
-  Batch - Use the 'benv' variable
+  Batch - Use the 'benv' and 'btenv' variable
 
 * val dataSet = benv.readTextFile("/path/to/data")
 * dataSet.writeAsText("/path/to/output")
 * benv.execute("My batch program")
+*
+* val batchTable = btenv.fromDataSet(dataSet)
+* btenv.registerTable("tableName", batchTable)
+* val result = btenv.sqlQuery("SELECT * FROM tableName").collect
+HINT: You can use print() on a DataSet to print the contents or collect()
+a sql query result back to the shell.
 
-HINT: You can use print() on a DataSet to print the contents to the shell.
-
-  Streaming - Use the 'senv' variable
+  Streaming - Use the 'senv' and 'stenv' variable
 
 * val dataStream = senv.fromElements(1, 2, 3, 4)
 * dataStream.countWindowAll(2).sum(0).print()
+*
+* val streamTable = stenv.fromDataStream(dataStream, 'num)
+* val resultTable = streamTable.select('num).where('num % 2 === 1 )
+* resultTable.toAppendStream[Row].print()
 * senv.execute("My streaming program")
-
 HINT: You can only print a DataStream to the shell in local mode.
   """
 // scalastyle:on
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 337e4fb9be9..fc90d8d143c 100644
--- 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ 
b/flink-scala-she

[GitHub] kisimple commented on issue #7192: [hotfix][test][streaming] Fix a test failure in Windows 7 environment.

2018-12-05 Thread GitBox
kisimple commented on issue #7192: [hotfix][test][streaming] Fix a test failure 
in Windows 7 environment.
URL: https://github.com/apache/flink/pull/7192#issuecomment-444750934
 
 
   cc @kl0u 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kisimple commented on issue #7215: [hotfix][test][streaming] Fix invalid testNotSideOutputXXX in WindowOperatorTest

2018-12-05 Thread GitBox
kisimple commented on issue #7215: [hotfix][test][streaming] Fix invalid 
testNotSideOutputXXX in WindowOperatorTest
URL: https://github.com/apache/flink/pull/7215#issuecomment-444750664
 
 
   cc @kl0u 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710971#comment-16710971
 ] 

ASF GitHub Bot commented on FLINK-10543:


hequn8128 commented on issue #6918: [FLINK-10543][table] Leverage efficient 
timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#issuecomment-444745171
 
 
   @sunjincheng121 Thanks for your view. I have updated the pr according to 
your suggestions.
   Best, Hequn
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on issue #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators

2018-12-05 Thread GitBox
hequn8128 commented on issue #6918: [FLINK-10543][table] Leverage efficient 
timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#issuecomment-444745171
 
 
   @sunjincheng121 Thanks for your view. I have updated the pr according to 
your suggestions.
   Best, Hequn
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2018-12-05 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710966#comment-16710966
 ] 

zhijiang commented on FLINK-11082:
--

[~pnowojski], what do you think of issue?

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] maqingxiang opened a new pull request #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs

2018-12-05 Thread GitBox
maqingxiang opened a new pull request #7248: [hotfix][tableApi] Fix the 
description information of intersect in tableApi docs
URL: https://github.com/apache/flink/pull/7248
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2018-12-05 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710963#comment-16710963
 ] 

TisonKun commented on FLINK-10333:
--

Hi [~StephanEwen] and [~till.rohrmann]

With an offline discuss with [~xiaogang.shi] we see ZK has a transactional 
mechanism so that we can ensure only the leader writes ZK.

Given this knowledge and the inconsistencies [~till.rohrmann] noticed, before 
go into reimplementation, I did a survey of the usage of ZK based stores in 
flink. Ideally there is exact one role who writes a specific znode.

There are four types of znodes that flink writes. Besides 
{{SubmittedJobGraphStore}} written by {{Dispatcher}}, 
{{CompletedCheckpointStore}} written by {{JobMaster}} and {{MesosWorkerStore}} 
written by {{MesosResourceManager}}, there is the {{RunningJobsRegistry}} that 
also has a ZK based implementation.

All of the first three write ZK with a heavy “store lock”, but as 
[~xiaogang.shi]  pointing out, it still lacks of atomicity. And with the 
solution based on ZK transaction — for example, a current {{Dispatcher}} leader 
{{setData}} with {{check}} for {{election-node-path}} — we can ensure the 
atomicity while getting rid of the lock.

For the last one, {{RunningJobsRegistry}}, situation becomes a bit more 
complex. {{JobManagerRunner}} is responsible for {{setJobRunning}} and 
{{setJobFinished}} and {{Dispatcher}} is responsible for {{clearJob}}. This is 
against the ideal that one role for one znode. Also I notice that the gap 
between the semantic of {{DONE}} and that of clear is ambiguous. 

{{JobSchedulingStatus}} becomes {{DONE}} only if an {{ArchivedExecutionGraph}} 
generated so that we can prevent the same job be processed by an approach other 
than check an ephemeral {{DONE}} status. What if we replace {{setJobFinished}} 
with clearing {{RunningJobsRegistry}}?

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.8.0
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7208) Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710960#comment-16710960
 ] 

ASF GitHub Bot commented on FLINK-7208:
---

walterddr commented on a change in pull request #7201: [FLINK-7208] [table] 
Optimize Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#discussion_r239320583
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1437,36 +1437,30 @@ object AggregateUtil {
 
   case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
 aggregates(index) = new 
CollectAggFunction(FlinkTypeFactory.toTypeInfo(relDataType))
-accTypes(index) = aggregates(index).getAccumulatorType
 
   case udagg: AggSqlFunction =>
 aggregates(index) = udagg.getFunction
-accTypes(index) = udagg.accType
 
   case unSupported: SqlAggFunction =>
 throw new TableException(s"Unsupported Function: 
'${unSupported.getName}'")
 }
   }
+  accTypes(index) = 
getAccumulatorTypeOfAggregateFunction(aggregates(index))
 }
 
 val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
 
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
 
 Review comment:
   +1 you are right. I missed the 4 lines above. 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor build-in agg(MaxWithRetractAccumulator and 
> MinWithRetractAccumulator) using the DataView
> -
>
> Key: FLINK-7208
> URL: https://issues.apache.org/jira/browse/FLINK-7208
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: kaibo.zhou
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Refactor build-in agg(MaxWithRetractAccumulator and 
> MinWithRetractAccumulator) using the DataView.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7208) Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710961#comment-16710961
 ] 

ASF GitHub Bot commented on FLINK-7208:
---

walterddr commented on a change in pull request #7201: [FLINK-7208] [table] 
Optimize Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#discussion_r239320467
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
 ##
 @@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase 
{
   .groupBy('b)
   .select('a.count as 'cnt, 'b)
   .groupBy('cnt)
-  .select('cnt, 'b.count as 'freq)
+  .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max)
 
 val results = t.toRetractStream[Row](queryConfig)
 
 results.addSink(new RetractingSink)
 env.execute()
-val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", 
"6,1,6,6")
 
 Review comment:
   Yeah. I am incline to suggest we fix FLINK-11074 first before we check this 
in. What do you think @dianfu @twalthr . 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor build-in agg(MaxWithRetractAccumulator and 
> MinWithRetractAccumulator) using the DataView
> -
>
> Key: FLINK-7208
> URL: https://issues.apache.org/jira/browse/FLINK-7208
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: kaibo.zhou
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Refactor build-in agg(MaxWithRetractAccumulator and 
> MinWithRetractAccumulator) using the DataView.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView

2018-12-05 Thread GitBox
walterddr commented on a change in pull request #7201: [FLINK-7208] [table] 
Optimize Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#discussion_r239320583
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1437,36 +1437,30 @@ object AggregateUtil {
 
   case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
 aggregates(index) = new 
CollectAggFunction(FlinkTypeFactory.toTypeInfo(relDataType))
-accTypes(index) = aggregates(index).getAccumulatorType
 
   case udagg: AggSqlFunction =>
 aggregates(index) = udagg.getFunction
-accTypes(index) = udagg.accType
 
   case unSupported: SqlAggFunction =>
 throw new TableException(s"Unsupported Function: 
'${unSupported.getName}'")
 }
   }
+  accTypes(index) = 
getAccumulatorTypeOfAggregateFunction(aggregates(index))
 }
 
 val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
 
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
 
 Review comment:
   +1 you are right. I missed the 4 lines above. 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView

2018-12-05 Thread GitBox
walterddr commented on a change in pull request #7201: [FLINK-7208] [table] 
Optimize Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#discussion_r239320467
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
 ##
 @@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase 
{
   .groupBy('b)
   .select('a.count as 'cnt, 'b)
   .groupBy('cnt)
-  .select('cnt, 'b.count as 'freq)
+  .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max)
 
 val results = t.toRetractStream[Row](queryConfig)
 
 results.addSink(new RetractingSink)
 env.execute()
-val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", 
"6,1,6,6")
 
 Review comment:
   Yeah. I am incline to suggest we fix FLINK-11074 first before we check this 
in. What do you think @dianfu @twalthr . 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710946#comment-16710946
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239307495
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
+*   table.flatMap(func('c)).as('a, 'b)
+* }}}
+*
+* Java Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction {
+* public void eval(String str) {
+*   if (str.contains("#")) {
+* String[] splits = str.split("#");
+* collect(Row.of(splits[0], splits[1]));
+*   }
+* }
+*
+* public TypeInformation getResultType(Class[] signature) {
+*   return Types.ROW(Types.STRING, Types.STRING);
+* }
+*   }
+*
+*   TableFunction func = new MyFlatMapFunction();
+*   tableEnv.registerFunction("func", func);
+*   table.flatMap("func(c)").as("a, b");
+* }}}
+*/
+  def flatMap(tableFunction: Expression): Table = {
+unwrap(tableFunction, tableEnv) match {
+  case _: TableFunctionCall =>
+  case _ => throw new ValidationException("Only TableFunction can be used 
in flatMap.")
+}
+
+// rename output fields names to avoid ambiguous name
+val originalCall = 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction)
+val originalOutputFieldNames = originalCall.output.map(_.name)
+val usedFieldNames: mutable.HashSet[String] = mutable.HashSet()
+logicalPlan.output.map(_.name).foreach(usedFieldNames.add)
+
+var i: Int = 0
+def findNewName(n: String): String = {
+  val newName = n + "_" + i
+  i += 1
+  if (usedFieldNames.contains(newName)) {
+findNewName(n)
+  } else {
+usedFieldNames.add(newName)
+newName
+  }
+}
+
+val newOutputFieldNames = originalOutputFieldNames.map(n =>
+  if (usedFieldNames.contains(n)) {
+findNewName(n)
+  } else {
+n
+  }
+)
 
 Review comment:
   Good suggestion. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710941#comment-16710941
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239308968
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/FlatMapValidationTest.scala
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func15
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.utils.{TableFunc0, TableTestBase}
+import org.junit.Test
+
+class FlatMapValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidMapFunctionTypeAggregation(): Unit = {
+val util = streamTestUtil()
+util.addTable[(Int)](
+  "MyTable", 'int)
+  .flatMap('int.sum) // do not support AggregateFunction as input
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10973) Add Map operator to Table API

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710955#comment-16710955
 ] 

ASF GitHub Bot commented on FLINK-10973:


dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to 
table API
URL: https://github.com/apache/flink/pull/7167#issuecomment-444737589
 
 
   Hi @twalthr, any comments? That would be great if you can give some feedback 
on this ticket. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Map operator to Table API
> -
>
> Key: FLINK-10973
> URL: https://issues.apache.org/jira/browse/FLINK-10973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Map operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]
> The usage:
> {code:java}
> val res = tab
>.map(fun: ScalarFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to table API

2018-12-05 Thread GitBox
dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to 
table API
URL: https://github.com/apache/flink/pull/7167#issuecomment-444737589
 
 
   Hi @twalthr, any comments? That would be great if you can give some feedback 
on this ticket. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710947#comment-16710947
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239302052
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -79,7 +80,8 @@ class Table(
 * @param udtfCall A String expression of the TableFunction call.
 */
   def this(tableEnv: TableEnvironment, udtfCall: String) {
-this(tableEnv, 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, udtfCall))
+this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall(
+  tableEnv, ExpressionParser.parseExpression(udtfCall)))
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710950#comment-16710950
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239301814
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, 
ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, 
UnresolvedFieldReference, WindowProperty}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, 
ExpressionParser, Ordering, ResolvedFieldReference, TableFunctionCall, 
UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710940#comment-16710940
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239308474
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -320,6 +320,18 @@ case class TableFunctionCall(
 
   override private[flink] def children: Seq[Expression] = parameters
 
+  def as(fields: Symbol*): TableFunctionCall = {
+this.aliases = Some(fields.map(_.name))
+this
+  }
+
+  def as(fields: String): TableFunctionCall = {
+val fieldExprs = ExpressionParser
 
 Review comment:
   Make sense. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710943#comment-16710943
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239308521
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -795,7 +795,7 @@ object UserDefinedFunctionUtils {
 "define table function followed by some Alias.")
 }
 
-val functionCall: LogicalTableFunctionCall = 
unwrap(ExpressionParser.parseExpression(udtf))
+val functionCall: LogicalTableFunctionCall = unwrap(udtfExpr)
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710949#comment-16710949
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239308496
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -328,7 +340,9 @@ case class TableFunctionCall(
 * @return this table function call
 */
   private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
-this.aliases = aliasList
+if (aliasList.isDefined) {
 
 Review comment:
   Agree. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710944#comment-16710944
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239302508
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
+*   table.flatMap(func('c)).as('a, 'b)
+* }}}
+*
+* Java Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction {
+* public void eval(String str) {
+*   if (str.contains("#")) {
+* String[] splits = str.split("#");
+* collect(Row.of(splits[0], splits[1]));
+*   }
+* }
+*
+* public TypeInformation getResultType(Class[] signature) {
+*   return Types.ROW(Types.STRING, Types.STRING);
+* }
+*   }
+*
+*   TableFunction func = new MyFlatMapFunction();
+*   tableEnv.registerFunction("func", func);
+*   table.flatMap("func(c)").as("a, b");
+* }}}
+*/
+  def flatMap(tableFunction: Expression): Table = {
+unwrap(tableFunction, tableEnv) match {
+  case _: TableFunctionCall =>
+  case _ => throw new ValidationException("Only TableFunction can be used 
in flatMap.")
+}
+
+// rename output fields names to avoid ambiguous name
+val originalCall = 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction)
+val originalOutputFieldNames = originalCall.output.map(_.name)
+val usedFieldNames: mutable.HashSet[String] = mutable.HashSet()
+logicalPlan.output.map(_.name).foreach(usedFieldNames.add)
 
 Review comment:
   Good suggestion, done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710945#comment-16710945
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239307765
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -320,6 +320,18 @@ case class TableFunctionCall(
 
   override private[flink] def children: Seq[Expression] = parameters
 
+  def as(fields: Symbol*): TableFunctionCall = {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710948#comment-16710948
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239318443
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/FlatMapITCase.scala
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
+import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.TableFunc2
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class FlatMapITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsClusterTestBase(mode, configMode) {
+
+  private def testData(env: ExecutionEnvironment): DataSet[(Int, Long, 
String)] = {
+val data = new mutable.MutableList[(Int, Long, String)]
+data.+=((1, 1L, "Jack#22"))
+data.+=((2, 2L, "John#333"))
+data.+=((3, 2L, "Anna#"))
+data.+=((4, 3L, "nosharp#5"))
+env.fromCollection(data)
+  }
+
+  @Test
+  def testFlatMap(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val func2 = new TableFunc2
+val results = testData(env).toTable(tEnv, 'a, 'b, 'c)
+  .flatMap(func2('c))
 
 Review comment:
   Agree. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710942#comment-16710942
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239302128
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239308521
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -795,7 +795,7 @@ object UserDefinedFunctionUtils {
 "define table function followed by some Alias.")
 }
 
-val functionCall: LogicalTableFunctionCall = 
unwrap(ExpressionParser.parseExpression(udtf))
+val functionCall: LogicalTableFunctionCall = unwrap(udtfExpr)
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239308496
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -328,7 +340,9 @@ case class TableFunctionCall(
 * @return this table function call
 */
   private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
-this.aliases = aliasList
+if (aliasList.isDefined) {
 
 Review comment:
   Agree. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239308474
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -320,6 +320,18 @@ case class TableFunctionCall(
 
   override private[flink] def children: Seq[Expression] = parameters
 
+  def as(fields: Symbol*): TableFunctionCall = {
+this.aliases = Some(fields.map(_.name))
+this
+  }
+
+  def as(fields: String): TableFunctionCall = {
+val fieldExprs = ExpressionParser
 
 Review comment:
   Make sense. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239302128
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239301814
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, 
ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, 
UnresolvedFieldReference, WindowProperty}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, 
ExpressionParser, Ordering, ResolvedFieldReference, TableFunctionCall, 
UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239318443
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/FlatMapITCase.scala
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
+import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.TableFunc2
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class FlatMapITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsClusterTestBase(mode, configMode) {
+
+  private def testData(env: ExecutionEnvironment): DataSet[(Int, Long, 
String)] = {
+val data = new mutable.MutableList[(Int, Long, String)]
+data.+=((1, 1L, "Jack#22"))
+data.+=((2, 2L, "John#333"))
+data.+=((3, 2L, "Anna#"))
+data.+=((4, 3L, "nosharp#5"))
+env.fromCollection(data)
+  }
+
+  @Test
+  def testFlatMap(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val func2 = new TableFunc2
+val results = testData(env).toTable(tEnv, 'a, 'b, 'c)
+  .flatMap(func2('c))
 
 Review comment:
   Agree. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239308968
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/FlatMapValidationTest.scala
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func15
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.utils.{TableFunc0, TableTestBase}
+import org.junit.Test
+
+class FlatMapValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidMapFunctionTypeAggregation(): Unit = {
+val util = streamTestUtil()
+util.addTable[(Int)](
+  "MyTable", 'int)
+  .flatMap('int.sum) // do not support AggregateFunction as input
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239302052
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -79,7 +80,8 @@ class Table(
 * @param udtfCall A String expression of the TableFunction call.
 */
   def this(tableEnv: TableEnvironment, udtfCall: String) {
-this(tableEnv, 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, udtfCall))
+this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall(
+  tableEnv, ExpressionParser.parseExpression(udtfCall)))
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239307495
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
+*   table.flatMap(func('c)).as('a, 'b)
+* }}}
+*
+* Java Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction {
+* public void eval(String str) {
+*   if (str.contains("#")) {
+* String[] splits = str.split("#");
+* collect(Row.of(splits[0], splits[1]));
+*   }
+* }
+*
+* public TypeInformation getResultType(Class[] signature) {
+*   return Types.ROW(Types.STRING, Types.STRING);
+* }
+*   }
+*
+*   TableFunction func = new MyFlatMapFunction();
+*   tableEnv.registerFunction("func", func);
+*   table.flatMap("func(c)").as("a, b");
+* }}}
+*/
+  def flatMap(tableFunction: Expression): Table = {
+unwrap(tableFunction, tableEnv) match {
+  case _: TableFunctionCall =>
+  case _ => throw new ValidationException("Only TableFunction can be used 
in flatMap.")
+}
+
+// rename output fields names to avoid ambiguous name
+val originalCall = 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction)
+val originalOutputFieldNames = originalCall.output.map(_.name)
+val usedFieldNames: mutable.HashSet[String] = mutable.HashSet()
+logicalPlan.output.map(_.name).foreach(usedFieldNames.add)
+
+var i: Int = 0
+def findNewName(n: String): String = {
+  val newName = n + "_" + i
+  i += 1
+  if (usedFieldNames.contains(newName)) {
+findNewName(n)
+  } else {
+usedFieldNames.add(newName)
+newName
+  }
+}
+
+val newOutputFieldNames = originalOutputFieldNames.map(n =>
+  if (usedFieldNames.contains(n)) {
+findNewName(n)
+  } else {
+n
+  }
+)
 
 Review comment:
   Good suggestion. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239307765
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -320,6 +320,18 @@ case class TableFunctionCall(
 
   override private[flink] def children: Seq[Expression] = parameters
 
+  def as(fields: Symbol*): TableFunctionCall = {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
dianfu commented on a change in pull request #7196: [FLINK-10974] [table] Add 
support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239302508
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
+*   table.flatMap(func('c)).as('a, 'b)
+* }}}
+*
+* Java Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction {
+* public void eval(String str) {
+*   if (str.contains("#")) {
+* String[] splits = str.split("#");
+* collect(Row.of(splits[0], splits[1]));
+*   }
+* }
+*
+* public TypeInformation getResultType(Class[] signature) {
+*   return Types.ROW(Types.STRING, Types.STRING);
+* }
+*   }
+*
+*   TableFunction func = new MyFlatMapFunction();
+*   tableEnv.registerFunction("func", func);
+*   table.flatMap("func(c)").as("a, b");
+* }}}
+*/
+  def flatMap(tableFunction: Expression): Table = {
+unwrap(tableFunction, tableEnv) match {
+  case _: TableFunctionCall =>
+  case _ => throw new ValidationException("Only TableFunction can be used 
in flatMap.")
+}
+
+// rename output fields names to avoid ambiguous name
+val originalCall = 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction)
+val originalOutputFieldNames = originalCall.output.map(_.name)
+val usedFieldNames: mutable.HashSet[String] = mutable.HashSet()
+logicalPlan.output.map(_.name).foreach(usedFieldNames.add)
 
 Review comment:
   Good suggestion, done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11082) Increase backlog only if it is available for consumption

2018-12-05 Thread zhijiang (JIRA)
zhijiang created FLINK-11082:


 Summary: Increase backlog only if it is available for consumption
 Key: FLINK-11082
 URL: https://issues.apache.org/jira/browse/FLINK-11082
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Affects Versions: 1.8.0
Reporter: zhijiang
Assignee: zhijiang


The backlog should indicate how many buffers are available in subpartition for 
downstream's  consumption. The availability is considered from two factors. One 
is {{BufferConsumer}} finished, and the other is flush triggered.

In current implementation, when the {{BufferConsumer}} is added into the 
subpartition, then the backlog is increased as a result, but this 
{{BufferConsumer}} is not yet available for network transport.

Furthermore, the backlog would affect requesting floating buffers on downstream 
side. That means some floating buffers are fetched in advance but not be used 
for long time, so the floating buffers are not made use of efficiently.

We found this scenario extremely for rebalance selector on upstream side, so we 
want to change when to increase backlog by finishing {{BufferConsumer}} or 
flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10941) Slots prematurely released which still contain unconsumed data

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710927#comment-16710927
 ] 

ASF GitHub Bot commented on FLINK-10941:


QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots 
which contain unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#discussion_r239314973
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -902,6 +902,15 @@ private void checkTaskManagerTimeouts() {
// first retrieve the timed out TaskManagers
for (TaskManagerRegistration taskManagerRegistration : 
taskManagerRegistrations.values()) {
if (currentTime - 
taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
+   // checking whether TaskManagers can be 
safely removed
 
 Review comment:
   Yes, your proposal will improve resource utilization. But considering that 
TM may be reused after all partitions are consumed (and before TM timeout), we 
may discuss this optimization in another issue. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Slots prematurely released which still contain unconsumed data 
> ---
>
> Key: FLINK-10941
> URL: https://issues.apache.org/jira/browse/FLINK-10941
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Qi
>Assignee: Qi
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 
> parallelism to write data sink.
>  
> The read task worked perfectly with 32 TMs. However when the job was 
> executing the write task, since only 4 TMs were needed, other 28 TMs were 
> released. This caused RemoteTransportException in the write task:
>  
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connection unexpectedly closed by remote task manager 
> ’the_previous_TM_used_by_read_task'. This might indicate that the remote task 
> manager was lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   ...
>  
> After skimming YarnFlinkResourceManager related code, it seems to me that 
> Flink is releasing TMs when they’re idle, regardless of whether working TMs 
> need them.
>  
> Put in another way, Flink seems to prematurely release slots which contain 
> unconsumed data and, thus, eventually release a TM which then fails a 
> consuming task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions

2018-12-05 Thread GitBox
QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots 
which contain unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#discussion_r239314973
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -902,6 +902,15 @@ private void checkTaskManagerTimeouts() {
// first retrieve the timed out TaskManagers
for (TaskManagerRegistration taskManagerRegistration : 
taskManagerRegistrations.values()) {
if (currentTime - 
taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
+   // checking whether TaskManagers can be 
safely removed
 
 Review comment:
   Yes, your proposal will improve resource utilization. But considering that 
TM may be reused after all partitions are consumed (and before TM timeout), we 
may discuss this optimization in another issue. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10941) Slots prematurely released which still contain unconsumed data

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710920#comment-16710920
 ] 

ASF GitHub Bot commented on FLINK-10941:


QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots 
which contain unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#discussion_r239314285
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -134,10 +138,17 @@ public void cancel(InputChannelID receiverId) {
ctx.pipeline().fireUserEventTriggered(receiverId);
}
 
-   public void close() {
+   public void close() throws IOException {
if (ctx != null) {
ctx.channel().close();
}
+
+   LOG.info("Close all {} readers pending for close.", 
readersToClose.size());
 
 Review comment:
   Thanks Zhijiang. Agree with you that TM should only exit after all 
connections are closed. 
   
   Regarding the side effect that 1 tail tasks out of 10 tasks will delay 
releasing partitions, I think this may not be bad since the resource is managed 
at TM level. If a TM cannot be released, it doesn't make too much difference 
whether it has 1 partition or 10 partitions. Please correct me if anything 
wrong.
   
   Considering all existing limitations, I think the resource management could 
be further improved via external shuffle service that you've proposed. This PR 
may focus on solving current issue. Looking forward to your further comments. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Slots prematurely released which still contain unconsumed data 
> ---
>
> Key: FLINK-10941
> URL: https://issues.apache.org/jira/browse/FLINK-10941
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Qi
>Assignee: Qi
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 
> parallelism to write data sink.
>  
> The read task worked perfectly with 32 TMs. However when the job was 
> executing the write task, since only 4 TMs were needed, other 28 TMs were 
> released. This caused RemoteTransportException in the write task:
>  
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connection unexpectedly closed by remote task manager 
> ’the_previous_TM_used_by_read_task'. This might indicate that the remote task 
> manager was lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>   ...
>  
> After skimming YarnFlinkResourceManager related code, it seems to me that 
> Flink is releasing TMs when they’re idle, regardless of whether working TMs 
> need them.
>  
> Put in another way, Flink seems to prematurely release slots which contain 
> unconsumed data and, thus, eventually release a TM which then fails a 
> consuming task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions

2018-12-05 Thread GitBox
QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots 
which contain unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#discussion_r239314285
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -134,10 +138,17 @@ public void cancel(InputChannelID receiverId) {
ctx.pipeline().fireUserEventTriggered(receiverId);
}
 
-   public void close() {
+   public void close() throws IOException {
if (ctx != null) {
ctx.channel().close();
}
+
+   LOG.info("Close all {} readers pending for close.", 
readersToClose.size());
 
 Review comment:
   Thanks Zhijiang. Agree with you that TM should only exit after all 
connections are closed. 
   
   Regarding the side effect that 1 tail tasks out of 10 tasks will delay 
releasing partitions, I think this may not be bad since the resource is managed 
at TM level. If a TM cannot be released, it doesn't make too much difference 
whether it has 1 partition or 10 partitions. Please correct me if anything 
wrong.
   
   Considering all existing limitations, I think the resource management could 
be further improved via external shuffle service that you've proposed. This PR 
may focus on solving current issue. Looking forward to your further comments. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10560) JVM_ARGS doesn't work in jobmanager and taskmangers' JVM on yarn

2018-12-05 Thread Jiayi Liao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710916#comment-16710916
 ] 

Jiayi Liao commented on FLINK-10560:


[~till.rohrmann] Thanks for pointing it out. It will be better if we can point 
it out in the comments in bin/config.sh.

> JVM_ARGS doesn't work in jobmanager and taskmangers' JVM on yarn
> 
>
> Key: FLINK-10560
> URL: https://issues.apache.org/jira/browse/FLINK-10560
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710911#comment-16710911
 ] 

ASF GitHub Bot commented on FLINK-10884:


wg1026688210 commented on a change in pull request #7185: [FLINK-10884] 
[yarn/mesos]  adjust  container memory param  to set a safe margin from offheap 
memory
URL: https://github.com/apache/flink/pull/7185#discussion_r239310963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ##
 @@ -158,8 +158,10 @@ public static ContaineredTaskManagerParameters create(
 
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
-   // use the cut-off memory for off-heap (that was its intention)
-   final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
+   // (3) try to compute the offHeapMemory from a safe margin
+   final long restMemoryMB = containerMemoryMB - heapSizeMB;
+   final long offHeapCutoffMemory = 
calculateOffHeapCutoffMB(config, restMemoryMB);
 
 Review comment:
   maybe we need some suggestions from @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink on yarn  TM container will be killed by nodemanager because of  the 
> exceeded  physical memory.
> 
>
> Key: FLINK-10884
> URL: https://issues.apache.org/jira/browse/FLINK-10884
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Core
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
> Environment: version  : 1.6.2 
> module : flink on yarn
> centos  jdk1.8
> hadoop 2.7
>Reporter: wgcn
>Assignee: wgcn
>Priority: Major
>  Labels: pull-request-available, yarn
>
> TM container will be killed by nodemanager because of  the exceeded  
> [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
>  memory. I found the lanuch context   lanuching TM container  that  
> "container memory =   heap memory+ offHeapSizeMB"  at the class 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
> from line 160 to 166  I set a safety margin for the whole memory container 
> using. For example  if the container  limit 3g  memory,  the sum memory that  
>  "heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container 
> being killed.Do we have the 
> [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
>  solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wg1026688210 commented on a change in pull request #7185: [FLINK-10884] [yarn/mesos] adjust container memory param to set a safe margin from offheap memory

2018-12-05 Thread GitBox
wg1026688210 commented on a change in pull request #7185: [FLINK-10884] 
[yarn/mesos]  adjust  container memory param  to set a safe margin from offheap 
memory
URL: https://github.com/apache/flink/pull/7185#discussion_r239310963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ##
 @@ -158,8 +158,10 @@ public static ContaineredTaskManagerParameters create(
 
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
-   // use the cut-off memory for off-heap (that was its intention)
-   final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
+   // (3) try to compute the offHeapMemory from a safe margin
+   final long restMemoryMB = containerMemoryMB - heapSizeMB;
+   final long offHeapCutoffMemory = 
calculateOffHeapCutoffMB(config, restMemoryMB);
 
 Review comment:
   maybe we need some suggestions from @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9555) Support table api in scala shell

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710906#comment-16710906
 ] 

ASF GitHub Bot commented on FLINK-9555:
---

sunjincheng121 commented on issue #7121: [FLINK-9555]Support table api in scala 
shell
URL: https://github.com/apache/flink/pull/7121#issuecomment-444726128
 
 
   Thanks for the update! @shuiqiangchen 
   Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on issue #7121: [FLINK-9555]Support table api in scala shell

2018-12-05 Thread GitBox
sunjincheng121 commented on issue #7121: [FLINK-9555]Support table api in scala 
shell
URL: https://github.com/apache/flink/pull/7121#issuecomment-444726128
 
 
   Thanks for the update! @shuiqiangchen 
   Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710899#comment-16710899
 ] 

ASF GitHub Bot commented on FLINK-11010:


lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444723623
 
 
   In flink, I think it is better to avoid timezone in calculation ,because the 
input is always UTC


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-05 Thread GitBox
lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444723623
 
 
   In flink, I think it is better to avoid timezone in calculation ,because the 
input is always UTC


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11065) Migrate flink-table runtime classes

2018-12-05 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710889#comment-16710889
 ] 

Hequn Cheng commented on FLINK-11065:
-

[~twalthr] Hi, thanks for driving this. I think we would benefit a lot from it 
once we finished. Considering how to create smaller subtasks, here are my 
thoughts:[~twalthr] [~xueyu]
Probably we can migrate runtime classes folder by folder. There are some 
reasons:
 - Different operators have already been split by folders, such as 
aggregations, join and match.
 - The relevance of files between folders is relatively small.
 - Each time we only have to focus on files in a single folder, so that will be 
easier to develop and review.

As for each folder, we can also create smaller tasks. For example, we can 
separate window join and non-window join into different tasks.

We can start our work from the root folder. Probably with the following three 
tasks:
 - Port runner classes({{*Runner}})
 - Port function classes({{*Function}})
 - Port other classes, such as {{CRowKeySelector}}, {{RowKeySelector}} and 
{{TableFunctionCollector}}

What do you guys think?

> Migrate flink-table runtime classes
> ---
>
> Key: FLINK-11065
> URL: https://issues.apache.org/jira/browse/FLINK-11065
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Priority: Major
>
> This issue covers the third step of the migration plan mentioned in 
> [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free].
> All runtime classes have little dependencies to other classes.
> This issue tracks efforts of porting runtime classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10783) Update WindowOperatorMigrationTest for 1.7

2018-12-05 Thread boshu Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

boshu Zheng reassigned FLINK-10783:
---

Assignee: boshu Zheng  (was: vinoyang)

> Update WindowOperatorMigrationTest for 1.7
> --
>
> Key: FLINK-10783
> URL: https://issues.apache.org/jira/browse/FLINK-10783
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: boshu Zheng
>Priority: Major
>
> Update {{WindowOperatorMigrationTest}} so that it covers restoring from 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10640) Enable Slot Resource Profile for Resource Management

2018-12-05 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710884#comment-16710884
 ] 

Tony Xintong Song commented on FLINK-10640:
---

Hi [~Tison], what you proposed is exactly what I've been thinking. I think it's 
actually an important part of the TM management. If we are going to manage 
dynamic number of {{TaskExecutor}} s, I think we do need such a pair of (min, 
max) to allow users limiting the range of {{TaskExecutor}} amount.

> Enable Slot Resource Profile for Resource Management
> 
>
> Key: FLINK-10640
> URL: https://issues.apache.org/jira/browse/FLINK-10640
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Priority: Major
>
> Motivation & Backgrounds
>  * The existing concept of task slots roughly represents how many pipeline of 
> tasks a TaskManager can hold. However, it does not consider the differences 
> in resource needs and usage of individual tasks. Enabling resource profiles 
> of slots may allow Flink to better allocate execution resources according to 
> tasks fine-grained resource needs.
>  * The community version Flink already contains APIs and some implementation 
> for slot resource profile. However, such logic is not truly used. 
> (ResourceProfile of slot requests is by default set to UNKNOWN with negative 
> values, thus matches any given slot.)
> Preliminary Design
>  * Slot Management
>  A slot represents a certain amount of resources for a single pipeline of 
> tasks to run in on a TaskManager. Initially, a TaskManager does not have any 
> slots but a total amount of resources. When allocating, the ResourceManager 
> finds proper TMs to generate new slots for the tasks to run according to the 
> slot requests. Once generated, the slot's size (resource profile) does not 
> change until it's freed. ResourceManager can apply different, portable 
> strategies to allocate slots from TaskManagers.
>  * TM Management
>  The size and number of TaskManagers and when to start them can also be 
> flexible. TMs can be started and released dynamically, and may have different 
> sizes. We may have many different, portable strategies. E.g., an elastic 
> session that can run multiple jobs like the session mode while dynamically 
> adjusting the size of session (number of TMs) according to the realtime 
> working load.
>  * About Slot Sharing
>  Slot sharing is a good heuristic to easily calculate how many slots needed 
> to get the job running and get better utilization when there is no resource 
> profile in slots. However, with resource profiles enabling finer-grained 
> resource management, each individual task has its specific resource need and 
> it does not make much sense to have multiple tasks sharing the resource of 
> the same slot. Instead, we may introduce locality preferences/constraints to 
> support the semantics of putting tasks in same/different TMs in a more 
> general way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11081) Support binding port range for REST server

2018-12-05 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-11081:


Assignee: vinoyang

> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710867#comment-16710867
 ] 

ASF GitHub Bot commented on FLINK-10543:


sunjincheng121 commented on a change in pull request #6918: 
[FLINK-10543][table] Leverage efficient timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#discussion_r239301372
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
 ##
 @@ -56,6 +56,10 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, 
O](queryConfig: Stream
 val cleanupTime = currentTime + maxRetentionTime
 // register timer and remember clean-up time
 ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+// delete expired timer
+if (curCleanupTime != null) {
+  ctx.timerService().deleteProcessingTimeTimer(curCleanupTime)
+}
 
 Review comment:
   We can remove `needToCleanupState ` after add delete logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710866#comment-16710866
 ] 

ASF GitHub Bot commented on FLINK-10543:


sunjincheng121 commented on a change in pull request #6918: 
[FLINK-10543][table] Leverage efficient timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#discussion_r239300038
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CoProcessFunctionWithCleanupState.scala
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state.{State, ValueState, 
ValueStateDescriptor}
+import org.apache.flink.streaming.api.TimeDomain
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class CoProcessFunctionWithCleanupState[IN1, IN2, OUT](queryConfig: 
StreamQueryConfig)
+  extends CoProcessFunction[IN1, IN2, OUT]
+  with CleanupState {
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // holds the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected def initCleanupTimeState(stateName: String) {
+if (stateCleaningEnabled) {
+  val cleanupTimeDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](stateName, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor)
+}
+  }
+
+  protected def registerProcessingCleanupTimer(
 
 Review comment:
   `registerProcessingCleanupTimer` -> `processCleanupTimer` because we not 
always really register the time. e.g.: when `stateCleaningEnabled is false`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710868#comment-16710868
 ] 

ASF GitHub Bot commented on FLINK-10543:


sunjincheng121 commented on a change in pull request #6918: 
[FLINK-10543][table] Leverage efficient timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#discussion_r239298808
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CleanupState.scala
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import java.lang.{Long => JLong}
+
+import org.apache.flink.streaming.api.TimerService
+
+/**
+  * Base class for clean up state, both for [[ProcessFunction]] and 
[[CoProcessFunction]].
+  */
+trait CleanupState {
 
 Review comment:
   I think it again, using queryConfig maybe less code reuse. so we can keep 
`CoProcessFunctionWithCleanupState` and `ProcessFunctionWithCleanupState` for 
`coProcessFunction and `processFunction`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators

2018-12-05 Thread GitBox
sunjincheng121 commented on a change in pull request #6918: 
[FLINK-10543][table] Leverage efficient timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#discussion_r239298808
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CleanupState.scala
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import java.lang.{Long => JLong}
+
+import org.apache.flink.streaming.api.TimerService
+
+/**
+  * Base class for clean up state, both for [[ProcessFunction]] and 
[[CoProcessFunction]].
+  */
+trait CleanupState {
 
 Review comment:
   I think it again, using queryConfig maybe less code reuse. so we can keep 
`CoProcessFunctionWithCleanupState` and `ProcessFunctionWithCleanupState` for 
`coProcessFunction and `processFunction`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators

2018-12-05 Thread GitBox
sunjincheng121 commented on a change in pull request #6918: 
[FLINK-10543][table] Leverage efficient timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#discussion_r239301372
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
 ##
 @@ -56,6 +56,10 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, 
O](queryConfig: Stream
 val cleanupTime = currentTime + maxRetentionTime
 // register timer and remember clean-up time
 ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+// delete expired timer
+if (curCleanupTime != null) {
+  ctx.timerService().deleteProcessingTimeTimer(curCleanupTime)
+}
 
 Review comment:
   We can remove `needToCleanupState ` after add delete logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators

2018-12-05 Thread GitBox
sunjincheng121 commented on a change in pull request #6918: 
[FLINK-10543][table] Leverage efficient timer deletion in relational operators
URL: https://github.com/apache/flink/pull/6918#discussion_r239300038
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CoProcessFunctionWithCleanupState.scala
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state.{State, ValueState, 
ValueStateDescriptor}
+import org.apache.flink.streaming.api.TimeDomain
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class CoProcessFunctionWithCleanupState[IN1, IN2, OUT](queryConfig: 
StreamQueryConfig)
+  extends CoProcessFunction[IN1, IN2, OUT]
+  with CleanupState {
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // holds the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected def initCleanupTimeState(stateName: String) {
+if (stateCleaningEnabled) {
+  val cleanupTimeDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](stateName, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor)
+}
+  }
+
+  protected def registerProcessingCleanupTimer(
 
 Review comment:
   `registerProcessingCleanupTimer` -> `processCleanupTimer` because we not 
always really register the time. e.g.: when `stateCleaningEnabled is false`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710822#comment-16710822
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239271764
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, 
ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, 
UnresolvedFieldReference, WindowProperty}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, 
ExpressionParser, Ordering, ResolvedFieldReference, TableFunctionCall, 
UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
 
 Review comment:
   more than 10 class import, i suggestion using `._`, What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710830#comment-16710830
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239281388
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -79,7 +80,8 @@ class Table(
 * @param udtfCall A String expression of the TableFunction call.
 */
   def this(tableEnv: TableEnvironment, udtfCall: String) {
-this(tableEnv, 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, udtfCall))
+this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall(
+  tableEnv, ExpressionParser.parseExpression(udtfCall)))
 
 Review comment:
   Format code as follows: ?
   ```
   this(
 tableEnv, 
 UserDefinedFunctionUtils.createLogicalFunctionCall(
   tableEnv, 
   ExpressionParser.parseExpression(udtfCall)
 )
   )
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710824#comment-16710824
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239279239
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -320,6 +320,18 @@ case class TableFunctionCall(
 
   override private[flink] def children: Seq[Expression] = parameters
 
+  def as(fields: Symbol*): TableFunctionCall = {
 
 Review comment:
   Add java doc: 
 /**
   * Assigns an alias for this table function's returned fields 
   *
   * @param fields alias for this table function's returned fields
   * @return this table function call
   */


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710828#comment-16710828
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239271885
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
 
 Review comment:
   `MyFlatMapFunction()`->`MyFlatMapFunction`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710823#comment-16710823
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239273278
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -320,6 +320,18 @@ case class TableFunctionCall(
 
   override private[flink] def children: Seq[Expression] = parameters
 
+  def as(fields: Symbol*): TableFunctionCall = {
+this.aliases = Some(fields.map(_.name))
+this
+  }
+
+  def as(fields: String): TableFunctionCall = {
+val fieldExprs = ExpressionParser
 
 Review comment:
   For Robustness I suggest add empty check, something like `if(fields.isEmpty) 
return this`, What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710831#comment-16710831
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239295918
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
+*   table.flatMap(func('c)).as('a, 'b)
+* }}}
+*
+* Java Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction {
+* public void eval(String str) {
+*   if (str.contains("#")) {
+* String[] splits = str.split("#");
+* collect(Row.of(splits[0], splits[1]));
+*   }
+* }
+*
+* public TypeInformation getResultType(Class[] signature) {
+*   return Types.ROW(Types.STRING, Types.STRING);
+* }
+*   }
+*
+*   TableFunction func = new MyFlatMapFunction();
+*   tableEnv.registerFunction("func", func);
+*   table.flatMap("func(c)").as("a, b");
+* }}}
+*/
+  def flatMap(tableFunction: Expression): Table = {
+unwrap(tableFunction, tableEnv) match {
+  case _: TableFunctionCall =>
+  case _ => throw new ValidationException("Only TableFunction can be used 
in flatMap.")
+}
+
+// rename output fields names to avoid ambiguous name
+val originalCall = 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction)
+val originalOutputFieldNames = originalCall.output.map(_.name)
+val usedFieldNames: mutable.HashSet[String] = mutable.HashSet()
+logicalPlan.output.map(_.name).foreach(usedFieldNames.add)
+
+var i: Int = 0
+def findNewName(n: String): String = {
+  val newName = n + "_" + i
+  i += 1
+  if (usedFieldNames.contains(newName)) {
+findNewName(n)
+  } else {
+usedFieldNames.add(newName)
+newName
+  }
+}
+
+val newOutputFieldNames = originalOutputFieldNames.map(n =>
+  if (usedFieldNames.contains(n)) {
+findNewName(n)
+  } else {
+n
+  }
+)
 
 Review comment:
   The logic of gen the new output field name can be changed as follows:
   ```
val newOutputFieldNames = originalOutputFieldNames.zipWithIndex.map( {
 case (name, index) => if(usedFieldNames.contains(name)){
   index + "_" + name
 }else {
   name
 }
   }).toArray.sorted
   ```
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710825#comment-16710825
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239291310
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
+*   table.flatMap(func('c)).as('a, 'b)
+* }}}
+*
+* Java Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction {
+* public void eval(String str) {
+*   if (str.contains("#")) {
+* String[] splits = str.split("#");
+* collect(Row.of(splits[0], splits[1]));
+*   }
+* }
+*
+* public TypeInformation getResultType(Class[] signature) {
+*   return Types.ROW(Types.STRING, Types.STRING);
+* }
+*   }
+*
+*   TableFunction func = new MyFlatMapFunction();
+*   tableEnv.registerFunction("func", func);
+*   table.flatMap("func(c)").as("a, b");
+* }}}
+*/
+  def flatMap(tableFunction: Expression): Table = {
+unwrap(tableFunction, tableEnv) match {
+  case _: TableFunctionCall =>
+  case _ => throw new ValidationException("Only TableFunction can be used 
in flatMap.")
+}
+
+// rename output fields names to avoid ambiguous name
+val originalCall = 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, tableFunction)
+val originalOutputFieldNames = originalCall.output.map(_.name)
+val usedFieldNames: mutable.HashSet[String] = mutable.HashSet()
+logicalPlan.output.map(_.name).foreach(usedFieldNames.add)
 
 Review comment:
   `logicalPlan.output.map(_.name).foreach(usedFieldNames.add)` -> 
`logicalPlan.output.foreach(a => usedFieldNames.add(a.name))` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710821#comment-16710821
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239273030
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ##
 @@ -328,7 +340,9 @@ case class TableFunctionCall(
 * @return this table function call
 */
   private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
-this.aliases = aliasList
+if (aliasList.isDefined) {
 
 Review comment:
   After add the `as(fields: String)`, I think can remove this method.  And 
then change the `UserDefinedFunctionUtils#`createLogicalFunctionCall` usage as 
following comment in `createLogicalFunctionCall`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239271885
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +1001,131 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a flatMap operation with an user-defined table function.
+*
+* Scala Example:
+* {{{
+*   class MyFlatMapFunction extends TableFunction[Row] {
+* def eval(str : String) {
+*   if (str.contains("#")) {
+* val splits = str.split("#")
+* collect(Row.of(splits(0), splits(1)))
+*   }
+* }
+*
+* def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+*   Types.ROW(Types.STRING, Types.STRING)
+*   }
+*
+*   val func = new MyFlatMapFunction()
 
 Review comment:
   `MyFlatMapFunction()`->`MyFlatMapFunction`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-05 Thread GitBox
sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239285343
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/FlatMapITCase.scala
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
+import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.TableFunc2
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class FlatMapITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsClusterTestBase(mode, configMode) {
+
+  private def testData(env: ExecutionEnvironment): DataSet[(Int, Long, 
String)] = {
+val data = new mutable.MutableList[(Int, Long, String)]
+data.+=((1, 1L, "Jack#22"))
+data.+=((2, 2L, "John#333"))
+data.+=((3, 2L, "Anna#"))
+data.+=((4, 3L, "nosharp#5"))
+env.fromCollection(data)
+  }
+
+  @Test
+  def testFlatMap(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val func2 = new TableFunc2
+val results = testData(env).toTable(tEnv, 'a, 'b, 'c)
+  .flatMap(func2('c))
 
 Review comment:
   Suggestions to enhance our test case as follows: (I'm fine if you want add 
the test in validate test not in it case)
val ds = testData(env).toTable(tEnv, 'a, 'b, 'c)
 .flatMap(func2('c)) // test non alias
 .select('f0, 'f1)
 .flatMap(func2("Sunny#Panpan") as('a, 'b)) // test alias
 .select('a, 'b)
 // test function output name same as left table field name
 .flatMap(func2("Dian#Flink").as('a, 'b))
 .select('a, 'b)
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710827#comment-16710827
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239285343
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/FlatMapITCase.scala
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
+import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.TableFunc2
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class FlatMapITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsClusterTestBase(mode, configMode) {
+
+  private def testData(env: ExecutionEnvironment): DataSet[(Int, Long, 
String)] = {
+val data = new mutable.MutableList[(Int, Long, String)]
+data.+=((1, 1L, "Jack#22"))
+data.+=((2, 2L, "John#333"))
+data.+=((3, 2L, "Anna#"))
+data.+=((4, 3L, "nosharp#5"))
+env.fromCollection(data)
+  }
+
+  @Test
+  def testFlatMap(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val func2 = new TableFunc2
+val results = testData(env).toTable(tEnv, 'a, 'b, 'c)
+  .flatMap(func2('c))
 
 Review comment:
   Suggestions to enhance our test case as follows: (I'm fine if you want add 
the test in validate test not in it case)
val ds = testData(env).toTable(tEnv, 'a, 'b, 'c)
 .flatMap(func2('c)) // test non alias
 .select('f0, 'f1)
 .flatMap(func2("Sunny#Panpan") as('a, 'b)) // test alias
 .select('a, 'b)
 // test function output name same as left table field name
 .flatMap(func2("Dian#Flink").as('a, 'b))
 .select('a, 'b)
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710829#comment-16710829
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239285940
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/FlatMapValidationTest.scala
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func15
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.utils.{TableFunc0, TableTestBase}
+import org.junit.Test
+
+class FlatMapValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidMapFunctionTypeAggregation(): Unit = {
+val util = streamTestUtil()
+util.addTable[(Int)](
+  "MyTable", 'int)
+  .flatMap('int.sum) // do not support AggregateFunction as input
 
 Review comment:
   add `.flatMap('int) ` test case ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710826#comment-16710826
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on a change in pull request #7196: [FLINK-10974] 
[table] Add support for flatMap to table API
URL: https://github.com/apache/flink/pull/7196#discussion_r239273803
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -795,7 +795,7 @@ object UserDefinedFunctionUtils {
 "define table function followed by some Alias.")
 }
 
-val functionCall: LogicalTableFunctionCall = 
unwrap(ExpressionParser.parseExpression(udtf))
+val functionCall: LogicalTableFunctionCall = unwrap(udtfExpr)
 
 Review comment:
   If we remove `as(Option(Seq[String]))`, we should change the usage looks 
like as follows:
   ```
   
unwrap(udtfExpr).as(alias.getOrElse(Seq()).mkString(",")).toLogicalTableFunctionCall(child
 = null)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >