[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object
[ https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-19887: --- Fix Version/s: (was: 1.11.4) 1.11.5 > Table program cannot be compiled when using Scala package object > > > Key: FLINK-19887 > URL: https://issues.apache.org/jira/browse/FLINK-19887 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.2 > Environment: 1.11.2 > 2.12 > jdk:1.8 > >Reporter: 谢波 >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.11.5 > > > {code:scala} > package object analysis { > case class UserBehavior(userId: Long, productId: Long, categoryId: Long, > behavior: String, ts: Long) > case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: > Long) > } > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val input = env.readTextFile("mock-data/UserBehavior.csv") > .map(e => { > val split = e.split(",") > UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), > split(4).toLong) > }) > .assignAscendingTimestamps(_.ts * 1000L) > // dataStreamApi(input) > //包对象下的样例类会导致编译错误,这是一个BUG > val table = tableEnv.fromDataStream(input, $"productId", $"behavior", > $"ts".rowtime) > table.printSchema() > table > .window(Slide over 1.hour every 5.minutes on $"ts" as $"w") > .groupBy($"w", $"productId") > .select($"w".end, $"productId", $"productId".count) > .toAppendStream[Row] > .print("table ") > // table.toAppendStream[Row].print("table") > // tableEnv.execute("table") > env.execute("hot item analysis") > } > {code} > > > {code} > rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) > *ROWTIME* > /* 1 *//* 2 */ public class SourceConversion$4 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 > */ private final Object[] references;/* 6 */ private transient > org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter > converter$0;/* 7 */ org.apache.flink.table.data.GenericRowData out = > new org.apache.flink.table.data.GenericRowData(3);/* 8 */ private > final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement > = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 > *//* 10 */ public SourceConversion$4(/* 11 */ Object[] > references,/* 12 */ > org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */ > org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */ > org.apache.flink.streaming.api.operators.Output output,/* 15 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception {/* 16 */ this.references = > references;/* 17 */ converter$0 = > (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) > references[0]));/* 18 */ this.setup(task, config, output);/* 19 */ > if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) > this)/* 21 */ > .setProcessingTimeService(processingTimeService);/* 22 */ }/* 23 */ > }/* 24 *//* 25 */ @Override/* 26 */ public void open() > throws Exception \{/* 27 */ super.open();/* 28 */ /* 29 */ > }/* 30 *//* 31 */ @Override/* 32 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception \{/* 33 */ > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) > converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior) > element.getValue());/* 34 */ /* 35 */ long field$1;/* 36 > */ boolean isNull$1;/* 37 */ > org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */ > boolean isNull$2;/* 39 */ org.apache.flink.table.data.TimestampData > result$3;/* 40 */ boolean isNull$3;/* 41 */ isNull$1 = > in1.isNullAt(1);/* 42 */ field$1 = -1L;/* 43 */ if > (!isNull$1) {/* 44 */ field$1 = in1.getLong(1);/* 45 */ > }/* 46 */
[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object
[ https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-19887: --- Labels: auto-deprioritized-major (was: stale-major) > Table program cannot be compiled when using Scala package object > > > Key: FLINK-19887 > URL: https://issues.apache.org/jira/browse/FLINK-19887 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.2 > Environment: 1.11.2 > 2.12 > jdk:1.8 > >Reporter: 谢波 >Priority: Major > Labels: auto-deprioritized-major > Fix For: 1.11.4 > > > {code:scala} > package object analysis { > case class UserBehavior(userId: Long, productId: Long, categoryId: Long, > behavior: String, ts: Long) > case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: > Long) > } > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val input = env.readTextFile("mock-data/UserBehavior.csv") > .map(e => { > val split = e.split(",") > UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), > split(4).toLong) > }) > .assignAscendingTimestamps(_.ts * 1000L) > // dataStreamApi(input) > //包对象下的样例类会导致编译错误,这是一个BUG > val table = tableEnv.fromDataStream(input, $"productId", $"behavior", > $"ts".rowtime) > table.printSchema() > table > .window(Slide over 1.hour every 5.minutes on $"ts" as $"w") > .groupBy($"w", $"productId") > .select($"w".end, $"productId", $"productId".count) > .toAppendStream[Row] > .print("table ") > // table.toAppendStream[Row].print("table") > // tableEnv.execute("table") > env.execute("hot item analysis") > } > {code} > > > {code} > rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) > *ROWTIME* > /* 1 *//* 2 */ public class SourceConversion$4 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 > */ private final Object[] references;/* 6 */ private transient > org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter > converter$0;/* 7 */ org.apache.flink.table.data.GenericRowData out = > new org.apache.flink.table.data.GenericRowData(3);/* 8 */ private > final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement > = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 > *//* 10 */ public SourceConversion$4(/* 11 */ Object[] > references,/* 12 */ > org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */ > org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */ > org.apache.flink.streaming.api.operators.Output output,/* 15 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception {/* 16 */ this.references = > references;/* 17 */ converter$0 = > (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) > references[0]));/* 18 */ this.setup(task, config, output);/* 19 */ > if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) > this)/* 21 */ > .setProcessingTimeService(processingTimeService);/* 22 */ }/* 23 */ > }/* 24 *//* 25 */ @Override/* 26 */ public void open() > throws Exception \{/* 27 */ super.open();/* 28 */ /* 29 */ > }/* 30 *//* 31 */ @Override/* 32 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception \{/* 33 */ > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) > converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior) > element.getValue());/* 34 */ /* 35 */ long field$1;/* 36 > */ boolean isNull$1;/* 37 */ > org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */ > boolean isNull$2;/* 39 */ org.apache.flink.table.data.TimestampData > result$3;/* 40 */ boolean isNull$3;/* 41 */ isNull$1 = > in1.isNullAt(1);/* 42 */ field$1 = -1L;/* 43 */ if > (!isNull$1) {/* 44 */ field$1 = in1.getLong(1);/* 45 */ > }/* 46 */
[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object
[ https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-19887: --- Priority: Minor (was: Major) > Table program cannot be compiled when using Scala package object > > > Key: FLINK-19887 > URL: https://issues.apache.org/jira/browse/FLINK-19887 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.2 > Environment: 1.11.2 > 2.12 > jdk:1.8 > >Reporter: 谢波 >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.11.4 > > > {code:scala} > package object analysis { > case class UserBehavior(userId: Long, productId: Long, categoryId: Long, > behavior: String, ts: Long) > case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: > Long) > } > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val input = env.readTextFile("mock-data/UserBehavior.csv") > .map(e => { > val split = e.split(",") > UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), > split(4).toLong) > }) > .assignAscendingTimestamps(_.ts * 1000L) > // dataStreamApi(input) > //包对象下的样例类会导致编译错误,这是一个BUG > val table = tableEnv.fromDataStream(input, $"productId", $"behavior", > $"ts".rowtime) > table.printSchema() > table > .window(Slide over 1.hour every 5.minutes on $"ts" as $"w") > .groupBy($"w", $"productId") > .select($"w".end, $"productId", $"productId".count) > .toAppendStream[Row] > .print("table ") > // table.toAppendStream[Row].print("table") > // tableEnv.execute("table") > env.execute("hot item analysis") > } > {code} > > > {code} > rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) > *ROWTIME* > /* 1 *//* 2 */ public class SourceConversion$4 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 > */ private final Object[] references;/* 6 */ private transient > org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter > converter$0;/* 7 */ org.apache.flink.table.data.GenericRowData out = > new org.apache.flink.table.data.GenericRowData(3);/* 8 */ private > final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement > = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 > *//* 10 */ public SourceConversion$4(/* 11 */ Object[] > references,/* 12 */ > org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */ > org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */ > org.apache.flink.streaming.api.operators.Output output,/* 15 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception {/* 16 */ this.references = > references;/* 17 */ converter$0 = > (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) > references[0]));/* 18 */ this.setup(task, config, output);/* 19 */ > if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) > this)/* 21 */ > .setProcessingTimeService(processingTimeService);/* 22 */ }/* 23 */ > }/* 24 *//* 25 */ @Override/* 26 */ public void open() > throws Exception \{/* 27 */ super.open();/* 28 */ /* 29 */ > }/* 30 *//* 31 */ @Override/* 32 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception \{/* 33 */ > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) > converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior) > element.getValue());/* 34 */ /* 35 */ long field$1;/* 36 > */ boolean isNull$1;/* 37 */ > org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */ > boolean isNull$2;/* 39 */ org.apache.flink.table.data.TimestampData > result$3;/* 40 */ boolean isNull$3;/* 41 */ isNull$1 = > in1.isNullAt(1);/* 42 */ field$1 = -1L;/* 43 */ if > (!isNull$1) {/* 44 */ field$1 = in1.getLong(1);/* 45 */ > }/* 46 */ isNull$2 = in1.isNull
[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object
[ https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-19887: --- Labels: stale-major (was: ) > Table program cannot be compiled when using Scala package object > > > Key: FLINK-19887 > URL: https://issues.apache.org/jira/browse/FLINK-19887 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.2 > Environment: 1.11.2 > 2.12 > jdk:1.8 > >Reporter: 谢波 >Priority: Major > Labels: stale-major > Fix For: 1.11.4 > > > {code:scala} > package object analysis { > case class UserBehavior(userId: Long, productId: Long, categoryId: Long, > behavior: String, ts: Long) > case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: > Long) > } > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val input = env.readTextFile("mock-data/UserBehavior.csv") > .map(e => { > val split = e.split(",") > UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), > split(4).toLong) > }) > .assignAscendingTimestamps(_.ts * 1000L) > // dataStreamApi(input) > //包对象下的样例类会导致编译错误,这是一个BUG > val table = tableEnv.fromDataStream(input, $"productId", $"behavior", > $"ts".rowtime) > table.printSchema() > table > .window(Slide over 1.hour every 5.minutes on $"ts" as $"w") > .groupBy($"w", $"productId") > .select($"w".end, $"productId", $"productId".count) > .toAppendStream[Row] > .print("table ") > // table.toAppendStream[Row].print("table") > // tableEnv.execute("table") > env.execute("hot item analysis") > } > {code} > > > {code} > rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) > *ROWTIME* > /* 1 *//* 2 */ public class SourceConversion$4 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 > */ private final Object[] references;/* 6 */ private transient > org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter > converter$0;/* 7 */ org.apache.flink.table.data.GenericRowData out = > new org.apache.flink.table.data.GenericRowData(3);/* 8 */ private > final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement > = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 > *//* 10 */ public SourceConversion$4(/* 11 */ Object[] > references,/* 12 */ > org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */ > org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */ > org.apache.flink.streaming.api.operators.Output output,/* 15 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception {/* 16 */ this.references = > references;/* 17 */ converter$0 = > (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) > references[0]));/* 18 */ this.setup(task, config, output);/* 19 */ > if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) > this)/* 21 */ > .setProcessingTimeService(processingTimeService);/* 22 */ }/* 23 */ > }/* 24 *//* 25 */ @Override/* 26 */ public void open() > throws Exception \{/* 27 */ super.open();/* 28 */ /* 29 */ > }/* 30 *//* 31 */ @Override/* 32 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception \{/* 33 */ > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) > converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior) > element.getValue());/* 34 */ /* 35 */ long field$1;/* 36 > */ boolean isNull$1;/* 37 */ > org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */ > boolean isNull$2;/* 39 */ org.apache.flink.table.data.TimestampData > result$3;/* 40 */ boolean isNull$3;/* 41 */ isNull$1 = > in1.isNullAt(1);/* 42 */ field$1 = -1L;/* 43 */ if > (!isNull$1) {/* 44 */ field$1 = in1.getLong(1);/* 45 */ > }/* 46 */ isNull$2 = in1.isNullAt(3);/* 47 */
[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object
[ https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19887: - Fix Version/s: (was: 1.11.3) 1.11.4 > Table program cannot be compiled when using Scala package object > > > Key: FLINK-19887 > URL: https://issues.apache.org/jira/browse/FLINK-19887 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.2 > Environment: 1.11.2 > 2.12 > jdk:1.8 > >Reporter: 谢波 >Priority: Major > Fix For: 1.11.4 > > > {code:scala} > package object analysis { > case class UserBehavior(userId: Long, productId: Long, categoryId: Long, > behavior: String, ts: Long) > case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: > Long) > } > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val input = env.readTextFile("mock-data/UserBehavior.csv") > .map(e => { > val split = e.split(",") > UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), > split(4).toLong) > }) > .assignAscendingTimestamps(_.ts * 1000L) > // dataStreamApi(input) > //包对象下的样例类会导致编译错误,这是一个BUG > val table = tableEnv.fromDataStream(input, $"productId", $"behavior", > $"ts".rowtime) > table.printSchema() > table > .window(Slide over 1.hour every 5.minutes on $"ts" as $"w") > .groupBy($"w", $"productId") > .select($"w".end, $"productId", $"productId".count) > .toAppendStream[Row] > .print("table ") > // table.toAppendStream[Row].print("table") > // tableEnv.execute("table") > env.execute("hot item analysis") > } > {code} > > > {code} > rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) > *ROWTIME* > /* 1 *//* 2 */ public class SourceConversion$4 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 > */ private final Object[] references;/* 6 */ private transient > org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter > converter$0;/* 7 */ org.apache.flink.table.data.GenericRowData out = > new org.apache.flink.table.data.GenericRowData(3);/* 8 */ private > final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement > = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 > *//* 10 */ public SourceConversion$4(/* 11 */ Object[] > references,/* 12 */ > org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */ > org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */ > org.apache.flink.streaming.api.operators.Output output,/* 15 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception {/* 16 */ this.references = > references;/* 17 */ converter$0 = > (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) > references[0]));/* 18 */ this.setup(task, config, output);/* 19 */ > if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) > this)/* 21 */ > .setProcessingTimeService(processingTimeService);/* 22 */ }/* 23 */ > }/* 24 *//* 25 */ @Override/* 26 */ public void open() > throws Exception \{/* 27 */ super.open();/* 28 */ /* 29 */ > }/* 30 *//* 31 */ @Override/* 32 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception \{/* 33 */ > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) > converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior) > element.getValue());/* 34 */ /* 35 */ long field$1;/* 36 > */ boolean isNull$1;/* 37 */ > org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */ > boolean isNull$2;/* 39 */ org.apache.flink.table.data.TimestampData > result$3;/* 40 */ boolean isNull$3;/* 41 */ isNull$1 = > in1.isNullAt(1);/* 42 */ field$1 = -1L;/* 43 */ if > (!isNull$1) {/* 44 */ field$1 = in1.getLong(1);/* 45 */ > }/* 46 */ isNull$2 = in1.isNullAt(3);/* 47 */
[jira] [Updated] (FLINK-19887) Table program cannot be compiled when using Scala package object
[ https://issues.apache.org/jira/browse/FLINK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19887: Summary: Table program cannot be compiled when using Scala package object (was: flink-1.11.2 - 使用包对象下的样例类,会导致table api程序编译失败) > Table program cannot be compiled when using Scala package object > > > Key: FLINK-19887 > URL: https://issues.apache.org/jira/browse/FLINK-19887 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.2 > Environment: 1.11.2 > 2.12 > jdk:1.8 > >Reporter: 谢波 >Priority: Major > Fix For: 1.11.3 > > > {code:scala} > package object analysis { > case class UserBehavior(userId: Long, productId: Long, categoryId: Long, > behavior: String, ts: Long) > case class ItemViewCount(var windowEnd: Long,var itemId: Long,var count: > Long) > } > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val input = env.readTextFile("mock-data/UserBehavior.csv") > .map(e => { > val split = e.split(",") > UserBehavior(split(0).toLong, split(1).toLong, split(2).toLong, split(3), > split(4).toLong) > }) > .assignAscendingTimestamps(_.ts * 1000L) > // dataStreamApi(input) > //包对象下的样例类会导致编译错误,这是一个BUG > val table = tableEnv.fromDataStream(input, $"productId", $"behavior", > $"ts".rowtime) > table.printSchema() > table > .window(Slide over 1.hour every 5.minutes on $"ts" as $"w") > .groupBy($"w", $"productId") > .select($"w".end, $"productId", $"productId".count) > .toAppendStream[Row] > .print("table ") > // table.toAppendStream[Row].print("table") > // tableEnv.execute("table") > env.execute("hot item analysis") > } > {code} > > > {code} > rootroot |-- productId: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) > *ROWTIME* > /* 1 *//* 2 */ public class SourceConversion$4 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator/* 3 */ > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator \{/* 4 *//* 5 > */ private final Object[] references;/* 6 */ private transient > org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter > converter$0;/* 7 */ org.apache.flink.table.data.GenericRowData out = > new org.apache.flink.table.data.GenericRowData(3);/* 8 */ private > final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement > = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);/* 9 > *//* 10 */ public SourceConversion$4(/* 11 */ Object[] > references,/* 12 */ > org.apache.flink.streaming.runtime.tasks.StreamTask task,/* 13 */ > org.apache.flink.streaming.api.graph.StreamConfig config,/* 14 */ > org.apache.flink.streaming.api.operators.Output output,/* 15 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception {/* 16 */ this.references = > references;/* 17 */ converter$0 = > (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) > references[0]));/* 18 */ this.setup(task, config, output);/* 19 */ > if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) {/* 20 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) > this)/* 21 */ > .setProcessingTimeService(processingTimeService);/* 22 */ }/* 23 */ > }/* 24 *//* 25 */ @Override/* 26 */ public void open() > throws Exception \{/* 27 */ super.open();/* 28 */ /* 29 */ > }/* 30 *//* 31 */ @Override/* 32 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception \{/* 33 */ > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) > converter$0.toInternal((com.hiscat.flink.user.behavior.analysis.package.UserBehavior) > element.getValue());/* 34 */ /* 35 */ long field$1;/* 36 > */ boolean isNull$1;/* 37 */ > org.apache.flink.table.data.binary.BinaryStringData field$2;/* 38 */ > boolean isNull$2;/* 39 */ org.apache.flink.table.data.TimestampData > result$3;/* 40 */ boolean isNull$3;/* 41 */ isNull$1 = > in1.isNullAt(1);/* 42 */ field$1 = -1L;/* 43 */ if > (!isNull$1) {/* 44 */ field$1 = in1.getLong(1);/* 45 */ > }/*