[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object

2021-07-13 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-19887:
---
Fix Version/s: (was: 1.11.4)
   1.11.5

> Table program cannot be compiled when using Scala package object
> 
>
> Key: FLINK-19887
> URL: https://issues.apache.org/jira/browse/FLINK-19887
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.2
> Environment: 1.11.2
> 2.12
> jdk:1.8
>  
>Reporter: 谢波
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.11.5
>
>
> {code:scala}
> package object analysis {
>  case class UserBehavior(userId: Long, productId: Long, categoryId: Long, 
> behavior: String, ts: Long)
>  case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: 
> Long)
> }
>  
> def main(args: Array[String]): Unit = {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val tableEnv = StreamTableEnvironment.create(env)
>  env.setParallelism(1)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  val input = env.readTextFile("mock-data/UserBehavior.csv")
>  .map(e => {
>  val split = e.split(",")
>  UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), 
> split(4).toLong)
>  })
>  .assignAscendingTimestamps(_.ts * 1000L)
>  // dataStreamApi(input)
>  //包对象下的样例类会导致编译错误,这是一个BUG
>  val table = tableEnv.fromDataStream(input, $"productId", $"behavior", 
> $"ts".rowtime)
>  table.printSchema()
>  table
>  .window(Slide over 1.hour every 5.minutes on $"ts" as $"w")
>  .groupBy($"w", $"productId")
>  .select($"w".end, $"productId", $"productId".count)
>  .toAppendStream[Row]
>  .print("table ")
>  // table.toAppendStream[Row].print("table")
> // tableEnv.execute("table")
>  env.execute("hot item analysis")
>  }
> {code}
>  
>  
> {code}
> rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 
> *ROWTIME*
> /* 1 *//* 2 */      public class SourceConversion$4 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ 
>          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 
> */        private final Object[] references;/* 6 */        private transient 
> org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter 
> converter$0;/* 7 */        org.apache.flink.table.data.GenericRowData out = 
> new org.apache.flink.table.data.GenericRowData(3);/* 8 */        private 
> final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement 
> = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 
> *//* 10 */        public SourceConversion$4(/* 11 */            Object[] 
> references,/* 12 */            
> org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */            
> org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */            
> org.apache.flink.streaming.api.operators.Output output,/* 15 */            
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {/* 16 */          this.references = 
> references;/* 17 */          converter$0 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) 
> references[0]));/* 18 */          this.setup(task, config, output);/* 19 */   
>        if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */    
>         ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
> this)/* 21 */              
> .setProcessingTimeService(processingTimeService);/* 22 */          }/* 23 */  
>       }/* 24 *//* 25 */        @Override/* 26 */        public void open() 
> throws Exception \{/* 27 */          super.open();/* 28 */          /* 29 */  
>       }/* 30 *//* 31 */        @Override/* 32 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception \{/* 33 */          
> org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) 
> converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior)
>  element.getValue());/* 34 */          /* 35 */          long field$1;/* 36 
> */          boolean isNull$1;/* 37 */          
> org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */          
> boolean isNull$2;/* 39 */          org.apache.flink.table.data.TimestampData 
> result$3;/* 40 */          boolean isNull$3;/* 41 */          isNull$1 = 
> in1.isNullAt(1);/* 42 */          field$1 = -1L;/* 43 */          if 
> (!isNull$1) {/* 44 */            field$1 = in1.getLong(1);/* 45 */          
> }/* 46 */          

[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object

2021-04-29 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-19887:
---
Labels: auto-deprioritized-major  (was: stale-major)

> Table program cannot be compiled when using Scala package object
> 
>
> Key: FLINK-19887
> URL: https://issues.apache.org/jira/browse/FLINK-19887
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.2
> Environment: 1.11.2
> 2.12
> jdk:1.8
>  
>Reporter: 谢波
>Priority: Major
>  Labels: auto-deprioritized-major
> Fix For: 1.11.4
>
>
> {code:scala}
> package object analysis {
>  case class UserBehavior(userId: Long, productId: Long, categoryId: Long, 
> behavior: String, ts: Long)
>  case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: 
> Long)
> }
>  
> def main(args: Array[String]): Unit = {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val tableEnv = StreamTableEnvironment.create(env)
>  env.setParallelism(1)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  val input = env.readTextFile("mock-data/UserBehavior.csv")
>  .map(e => {
>  val split = e.split(",")
>  UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), 
> split(4).toLong)
>  })
>  .assignAscendingTimestamps(_.ts * 1000L)
>  // dataStreamApi(input)
>  //包对象下的样例类会导致编译错误,这是一个BUG
>  val table = tableEnv.fromDataStream(input, $"productId", $"behavior", 
> $"ts".rowtime)
>  table.printSchema()
>  table
>  .window(Slide over 1.hour every 5.minutes on $"ts" as $"w")
>  .groupBy($"w", $"productId")
>  .select($"w".end, $"productId", $"productId".count)
>  .toAppendStream[Row]
>  .print("table ")
>  // table.toAppendStream[Row].print("table")
> // tableEnv.execute("table")
>  env.execute("hot item analysis")
>  }
> {code}
>  
>  
> {code}
> rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 
> *ROWTIME*
> /* 1 *//* 2 */      public class SourceConversion$4 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ 
>          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 
> */        private final Object[] references;/* 6 */        private transient 
> org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter 
> converter$0;/* 7 */        org.apache.flink.table.data.GenericRowData out = 
> new org.apache.flink.table.data.GenericRowData(3);/* 8 */        private 
> final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement 
> = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 
> *//* 10 */        public SourceConversion$4(/* 11 */            Object[] 
> references,/* 12 */            
> org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */            
> org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */            
> org.apache.flink.streaming.api.operators.Output output,/* 15 */            
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {/* 16 */          this.references = 
> references;/* 17 */          converter$0 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) 
> references[0]));/* 18 */          this.setup(task, config, output);/* 19 */   
>        if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */    
>         ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
> this)/* 21 */              
> .setProcessingTimeService(processingTimeService);/* 22 */          }/* 23 */  
>       }/* 24 *//* 25 */        @Override/* 26 */        public void open() 
> throws Exception \{/* 27 */          super.open();/* 28 */          /* 29 */  
>       }/* 30 *//* 31 */        @Override/* 32 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception \{/* 33 */          
> org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) 
> converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior)
>  element.getValue());/* 34 */          /* 35 */          long field$1;/* 36 
> */          boolean isNull$1;/* 37 */          
> org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */          
> boolean isNull$2;/* 39 */          org.apache.flink.table.data.TimestampData 
> result$3;/* 40 */          boolean isNull$3;/* 41 */          isNull$1 = 
> in1.isNullAt(1);/* 42 */          field$1 = -1L;/* 43 */          if 
> (!isNull$1) {/* 44 */            field$1 = in1.getLong(1);/* 45 */          
> }/* 46 */        

[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object

2021-04-29 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-19887:
---
Priority: Minor  (was: Major)

> Table program cannot be compiled when using Scala package object
> 
>
> Key: FLINK-19887
> URL: https://issues.apache.org/jira/browse/FLINK-19887
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.2
> Environment: 1.11.2
> 2.12
> jdk:1.8
>  
>Reporter: 谢波
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.11.4
>
>
> {code:scala}
> package object analysis {
>  case class UserBehavior(userId: Long, productId: Long, categoryId: Long, 
> behavior: String, ts: Long)
>  case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: 
> Long)
> }
>  
> def main(args: Array[String]): Unit = {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val tableEnv = StreamTableEnvironment.create(env)
>  env.setParallelism(1)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  val input = env.readTextFile("mock-data/UserBehavior.csv")
>  .map(e => {
>  val split = e.split(",")
>  UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), 
> split(4).toLong)
>  })
>  .assignAscendingTimestamps(_.ts * 1000L)
>  // dataStreamApi(input)
>  //包对象下的样例类会导致编译错误,这是一个BUG
>  val table = tableEnv.fromDataStream(input, $"productId", $"behavior", 
> $"ts".rowtime)
>  table.printSchema()
>  table
>  .window(Slide over 1.hour every 5.minutes on $"ts" as $"w")
>  .groupBy($"w", $"productId")
>  .select($"w".end, $"productId", $"productId".count)
>  .toAppendStream[Row]
>  .print("table ")
>  // table.toAppendStream[Row].print("table")
> // tableEnv.execute("table")
>  env.execute("hot item analysis")
>  }
> {code}
>  
>  
> {code}
> rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 
> *ROWTIME*
> /* 1 *//* 2 */      public class SourceConversion$4 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ 
>          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 
> */        private final Object[] references;/* 6 */        private transient 
> org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter 
> converter$0;/* 7 */        org.apache.flink.table.data.GenericRowData out = 
> new org.apache.flink.table.data.GenericRowData(3);/* 8 */        private 
> final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement 
> = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 
> *//* 10 */        public SourceConversion$4(/* 11 */            Object[] 
> references,/* 12 */            
> org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */            
> org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */            
> org.apache.flink.streaming.api.operators.Output output,/* 15 */            
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {/* 16 */          this.references = 
> references;/* 17 */          converter$0 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) 
> references[0]));/* 18 */          this.setup(task, config, output);/* 19 */   
>        if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */    
>         ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
> this)/* 21 */              
> .setProcessingTimeService(processingTimeService);/* 22 */          }/* 23 */  
>       }/* 24 *//* 25 */        @Override/* 26 */        public void open() 
> throws Exception \{/* 27 */          super.open();/* 28 */          /* 29 */  
>       }/* 30 *//* 31 */        @Override/* 32 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception \{/* 33 */          
> org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) 
> converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior)
>  element.getValue());/* 34 */          /* 35 */          long field$1;/* 36 
> */          boolean isNull$1;/* 37 */          
> org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */          
> boolean isNull$2;/* 39 */          org.apache.flink.table.data.TimestampData 
> result$3;/* 40 */          boolean isNull$3;/* 41 */          isNull$1 = 
> in1.isNullAt(1);/* 42 */          field$1 = -1L;/* 43 */          if 
> (!isNull$1) {/* 44 */            field$1 = in1.getLong(1);/* 45 */          
> }/* 46 */          isNull$2 = in1.isNull

[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object

2021-04-22 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-19887:
---
Labels: stale-major  (was: )

> Table program cannot be compiled when using Scala package object
> 
>
> Key: FLINK-19887
> URL: https://issues.apache.org/jira/browse/FLINK-19887
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.2
> Environment: 1.11.2
> 2.12
> jdk:1.8
>  
>Reporter: 谢波
>Priority: Major
>  Labels: stale-major
> Fix For: 1.11.4
>
>
> {code:scala}
> package object analysis {
>  case class UserBehavior(userId: Long, productId: Long, categoryId: Long, 
> behavior: String, ts: Long)
>  case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: 
> Long)
> }
>  
> def main(args: Array[String]): Unit = {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val tableEnv = StreamTableEnvironment.create(env)
>  env.setParallelism(1)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  val input = env.readTextFile("mock-data/UserBehavior.csv")
>  .map(e => {
>  val split = e.split(",")
>  UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), 
> split(4).toLong)
>  })
>  .assignAscendingTimestamps(_.ts * 1000L)
>  // dataStreamApi(input)
>  //包对象下的样例类会导致编译错误,这是一个BUG
>  val table = tableEnv.fromDataStream(input, $"productId", $"behavior", 
> $"ts".rowtime)
>  table.printSchema()
>  table
>  .window(Slide over 1.hour every 5.minutes on $"ts" as $"w")
>  .groupBy($"w", $"productId")
>  .select($"w".end, $"productId", $"productId".count)
>  .toAppendStream[Row]
>  .print("table ")
>  // table.toAppendStream[Row].print("table")
> // tableEnv.execute("table")
>  env.execute("hot item analysis")
>  }
> {code}
>  
>  
> {code}
> rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 
> *ROWTIME*
> /* 1 *//* 2 */      public class SourceConversion$4 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ 
>          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 
> */        private final Object[] references;/* 6 */        private transient 
> org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter 
> converter$0;/* 7 */        org.apache.flink.table.data.GenericRowData out = 
> new org.apache.flink.table.data.GenericRowData(3);/* 8 */        private 
> final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement 
> = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 
> *//* 10 */        public SourceConversion$4(/* 11 */            Object[] 
> references,/* 12 */            
> org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */            
> org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */            
> org.apache.flink.streaming.api.operators.Output output,/* 15 */            
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {/* 16 */          this.references = 
> references;/* 17 */          converter$0 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) 
> references[0]));/* 18 */          this.setup(task, config, output);/* 19 */   
>        if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */    
>         ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
> this)/* 21 */              
> .setProcessingTimeService(processingTimeService);/* 22 */          }/* 23 */  
>       }/* 24 *//* 25 */        @Override/* 26 */        public void open() 
> throws Exception \{/* 27 */          super.open();/* 28 */          /* 29 */  
>       }/* 30 *//* 31 */        @Override/* 32 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception \{/* 33 */          
> org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) 
> converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior)
>  element.getValue());/* 34 */          /* 35 */          long field$1;/* 36 
> */          boolean isNull$1;/* 37 */          
> org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */          
> boolean isNull$2;/* 39 */          org.apache.flink.table.data.TimestampData 
> result$3;/* 40 */          boolean isNull$3;/* 41 */          isNull$1 = 
> in1.isNullAt(1);/* 42 */          field$1 = -1L;/* 43 */          if 
> (!isNull$1) {/* 44 */            field$1 = in1.getLong(1);/* 45 */          
> }/* 46 */          isNull$2 = in1.isNullAt(3);/* 47 */

[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object

2020-11-17 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-19887:
-
Fix Version/s: (was: 1.11.3)
   1.11.4

> Table program cannot be compiled when using Scala package object
> 
>
> Key: FLINK-19887
> URL: https://issues.apache.org/jira/browse/FLINK-19887
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.2
> Environment: 1.11.2
> 2.12
> jdk:1.8
>  
>Reporter: 谢波
>Priority: Major
> Fix For: 1.11.4
>
>
> {code:scala}
> package object analysis {
>  case class UserBehavior(userId: Long, productId: Long, categoryId: Long, 
> behavior: String, ts: Long)
>  case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: 
> Long)
> }
>  
> def main(args: Array[String]): Unit = {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val tableEnv = StreamTableEnvironment.create(env)
>  env.setParallelism(1)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  val input = env.readTextFile("mock-data/UserBehavior.csv")
>  .map(e => {
>  val split = e.split(",")
>  UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), 
> split(4).toLong)
>  })
>  .assignAscendingTimestamps(_.ts * 1000L)
>  // dataStreamApi(input)
>  //包对象下的样例类会导致编译错误,这是一个BUG
>  val table = tableEnv.fromDataStream(input, $"productId", $"behavior", 
> $"ts".rowtime)
>  table.printSchema()
>  table
>  .window(Slide over 1.hour every 5.minutes on $"ts" as $"w")
>  .groupBy($"w", $"productId")
>  .select($"w".end, $"productId", $"productId".count)
>  .toAppendStream[Row]
>  .print("table ")
>  // table.toAppendStream[Row].print("table")
> // tableEnv.execute("table")
>  env.execute("hot item analysis")
>  }
> {code}
>  
>  
> {code}
> rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 
> *ROWTIME*
> /* 1 *//* 2 */      public class SourceConversion$4 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ 
>          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 
> */        private final Object[] references;/* 6 */        private transient 
> org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter 
> converter$0;/* 7 */        org.apache.flink.table.data.GenericRowData out = 
> new org.apache.flink.table.data.GenericRowData(3);/* 8 */        private 
> final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement 
> = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 
> *//* 10 */        public SourceConversion$4(/* 11 */            Object[] 
> references,/* 12 */            
> org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */            
> org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */            
> org.apache.flink.streaming.api.operators.Output output,/* 15 */            
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {/* 16 */          this.references = 
> references;/* 17 */          converter$0 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) 
> references[0]));/* 18 */          this.setup(task, config, output);/* 19 */   
>        if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */    
>         ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
> this)/* 21 */              
> .setProcessingTimeService(processingTimeService);/* 22 */          }/* 23 */  
>       }/* 24 *//* 25 */        @Override/* 26 */        public void open() 
> throws Exception \{/* 27 */          super.open();/* 28 */          /* 29 */  
>       }/* 30 *//* 31 */        @Override/* 32 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception \{/* 33 */          
> org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) 
> converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior)
>  element.getValue());/* 34 */          /* 35 */          long field$1;/* 36 
> */          boolean isNull$1;/* 37 */          
> org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */          
> boolean isNull$2;/* 39 */          org.apache.flink.table.data.TimestampData 
> result$3;/* 40 */          boolean isNull$3;/* 41 */          isNull$1 = 
> in1.isNullAt(1);/* 42 */          field$1 = -1L;/* 43 */          if 
> (!isNull$1) {/* 44 */            field$1 = in1.getLong(1);/* 45 */          
> }/* 46 */          isNull$2 = in1.isNullAt(3);/* 47 */         

[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object

2020-10-29 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-19887:

Summary: Table program cannot be compiled when using Scala package object  
(was: flink-1.11.2 - 使用包对象下的样例类,会导致table api程序编译失败)

> Table program cannot be compiled when using Scala package object
> 
>
> Key: FLINK-19887
> URL: https://issues.apache.org/jira/browse/FLINK-19887
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.2
> Environment: 1.11.2
> 2.12
> jdk:1.8
>  
>Reporter: 谢波
>Priority: Major
> Fix For: 1.11.3
>
>
> {code:scala}
> package object analysis {
>  case class UserBehavior(userId: Long, productId: Long, categoryId: Long, 
> behavior: String, ts: Long)
>  case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: 
> Long)
> }
>  
> def main(args: Array[String]): Unit = {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val tableEnv = StreamTableEnvironment.create(env)
>  env.setParallelism(1)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  val input = env.readTextFile("mock-data/UserBehavior.csv")
>  .map(e => {
>  val split = e.split(",")
>  UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), 
> split(4).toLong)
>  })
>  .assignAscendingTimestamps(_.ts * 1000L)
>  // dataStreamApi(input)
>  //包对象下的样例类会导致编译错误,这是一个BUG
>  val table = tableEnv.fromDataStream(input, $"productId", $"behavior", 
> $"ts".rowtime)
>  table.printSchema()
>  table
>  .window(Slide over 1.hour every 5.minutes on $"ts" as $"w")
>  .groupBy($"w", $"productId")
>  .select($"w".end, $"productId", $"productId".count)
>  .toAppendStream[Row]
>  .print("table ")
>  // table.toAppendStream[Row].print("table")
> // tableEnv.execute("table")
>  env.execute("hot item analysis")
>  }
> {code}
>  
>  
> {code}
> rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 
> *ROWTIME*
> /* 1 *//* 2 */      public class SourceConversion$4 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ 
>          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 
> */        private final Object[] references;/* 6 */        private transient 
> org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter 
> converter$0;/* 7 */        org.apache.flink.table.data.GenericRowData out = 
> new org.apache.flink.table.data.GenericRowData(3);/* 8 */        private 
> final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement 
> = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 
> *//* 10 */        public SourceConversion$4(/* 11 */            Object[] 
> references,/* 12 */            
> org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */            
> org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */            
> org.apache.flink.streaming.api.operators.Output output,/* 15 */            
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {/* 16 */          this.references = 
> references;/* 17 */          converter$0 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) 
> references[0]));/* 18 */          this.setup(task, config, output);/* 19 */   
>        if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */    
>         ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
> this)/* 21 */              
> .setProcessingTimeService(processingTimeService);/* 22 */          }/* 23 */  
>       }/* 24 *//* 25 */        @Override/* 26 */        public void open() 
> throws Exception \{/* 27 */          super.open();/* 28 */          /* 29 */  
>       }/* 30 *//* 31 */        @Override/* 32 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception \{/* 33 */          
> org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) 
> converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior)
>  element.getValue());/* 34 */          /* 35 */          long field$1;/* 36 
> */          boolean isNull$1;/* 37 */          
> org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */          
> boolean isNull$2;/* 39 */          org.apache.flink.table.data.TimestampData 
> result$3;/* 40 */          boolean isNull$3;/* 41 */          isNull$1 = 
> in1.isNullAt(1);/* 42 */          field$1 = -1L;/* 43 */          if 
> (!isNull$1) {/* 44 */            field$1 = in1.getLong(1);/* 45 */          
> }/*