Re: [Table API] ClassCastException when converting a table to DataStream
Hi Dongwon, Sorry for the late reply. I did try some experiment and seems like you are right: Setting the `.return()` type actually alter the underlying type of the DataStream from a GenericType into a specific RowTypeInfo. Please see the JIRA ticket [1] for more info. Regarding the approach, yes I think you cannot access the timer service from the table/SQL API at this moment so that might be the best approach. And as Fabian suggested, I don't think there's too much problem if you are not changing the type info underlying in your DataStream. I will follow up with this in the JIRA ticket. -- Rong [1] https://issues.apache.org/jira/browse/FLINK-13389 On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim wrote: > Hi Fabian, > > Thanks for clarification :-) > I could convert back and forth without worrying about it as I keep using > Row type during the conversion (even though fields are added). > > Best, > > Dongwon > > > > On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske wrote: > >> Hi Dongwon, >> >> regarding the question about the conversion: If you keep using the Row >> type and not adding/removing fields, the conversion is pretty much for free >> right now. >> It will be a MapFunction (sometimes even not function at all) that should >> be chained with the other operators. Hence, it should boil down to a >> function call. >> >> Best, Fabian >> >> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim < >> eastcirc...@gmail.com>: >> >>> Hi Rong, >>> >>> I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything. >>> >>> Thanks a lot for spending your time on this. >>> >>> However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream. E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1]. As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. >>> >>> I also agree with you in that the first class citizen Table API will >>> make everything not only easier but also a lot cleaner. >>> We however contain some corner cases that force us to covert Table from >>> and to DataStream. >>> One such case is to append to Table a column showing the current >>> watermark of each record; there's no other way but to do that as >>> ScalarFunction doesn't allow us to get the runtime context information as >>> ProcessFunction does. >>> >>> I have a question regarding the conversion. >>> Do I have to worry about runtime performance penalty in case that I >>> cannot help but convert back and fourth to DataStream? >>> >>> Best, >>> >>> Dongwon >>> >>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong wrote: >>> Hi Dongwon, I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything. However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream. E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1]. As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. -- Rong [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions [2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html [3] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html [4] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim wrote: > Hi Rong, > > Thank you for reply :-) > > which Flink version are you using? > > I'm using Flink-1.8.0. > > what is the "sourceTable.getSchema().toRowType()" return? > > Row(time1: TimeIndicatorTypeInfo(rowtime)) > > what is the line *".map(a -> a)" *do and can you remove it? > > *".map(a->a)"* is just to illustrate a problem. > My actual code contains a process function (instead of .map() in the > snippet) which appends a new field containing watermark to a row. > If there were ways to get watermark inside a scalar UDF, I wouldn't > convert table to datastream and vice versa. > > if I am understanding correctly, you are also using "time1" as the >> rowtime, is that want your intension is
Re: [Table API] ClassCastException when converting a table to DataStream
Hi Fabian, Thanks for clarification :-) I could convert back and forth without worrying about it as I keep using Row type during the conversion (even though fields are added). Best, Dongwon On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske wrote: > Hi Dongwon, > > regarding the question about the conversion: If you keep using the Row > type and not adding/removing fields, the conversion is pretty much for free > right now. > It will be a MapFunction (sometimes even not function at all) that should > be chained with the other operators. Hence, it should boil down to a > function call. > > Best, Fabian > > Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim < > eastcirc...@gmail.com>: > >> Hi Rong, >> >> I have to dig deeper into the code to reproduce this error. This seems to >>> be a bug to me and will update once I find anything. >> >> Thanks a lot for spending your time on this. >> >> However from what you explained, if I understand correctly you can do all >>> of your processing within the TableAPI scope without converting it back and >>> forth to DataStream. >>> E.g. if your "map(a -> a)" placeholder represents some sort of map >>> function that's simple enough, you can implement and connect with the table >>> API via UserDefinedFunction[1]. >>> As TableAPI becoming the first class citizen [2,3,4], this would be much >>> cleaner implementation from my perspective. >> >> I also agree with you in that the first class citizen Table API will make >> everything not only easier but also a lot cleaner. >> We however contain some corner cases that force us to covert Table from >> and to DataStream. >> One such case is to append to Table a column showing the current >> watermark of each record; there's no other way but to do that as >> ScalarFunction doesn't allow us to get the runtime context information as >> ProcessFunction does. >> >> I have a question regarding the conversion. >> Do I have to worry about runtime performance penalty in case that I >> cannot help but convert back and fourth to DataStream? >> >> Best, >> >> Dongwon >> >> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong wrote: >> >>> Hi Dongwon, >>> >>> I have to dig deeper into the code to reproduce this error. This seems >>> to be a bug to me and will update once I find anything. >>> >>> However from what you explained, if I understand correctly you can do >>> all of your processing within the TableAPI scope without converting it back >>> and forth to DataStream. >>> E.g. if your "map(a -> a)" placeholder represents some sort of map >>> function that's simple enough, you can implement and connect with the table >>> API via UserDefinedFunction[1]. >>> As TableAPI becoming the first class citizen [2,3,4], this would be much >>> cleaner implementation from my perspective. >>> >>> -- >>> Rong >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions >>> [2] >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html >>> [3] >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html >>> [4] >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html >>> >>> >>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim >>> wrote: >>> Hi Rong, Thank you for reply :-) which Flink version are you using? I'm using Flink-1.8.0. what is the "sourceTable.getSchema().toRowType()" return? Row(time1: TimeIndicatorTypeInfo(rowtime)) what is the line *".map(a -> a)" *do and can you remove it? *".map(a->a)"* is just to illustrate a problem. My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row. If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa. if I am understanding correctly, you are also using "time1" as the > rowtime, is that want your intension is to use it later as well? yup :-) As far as I know *".returns(sourceTable.getSchema().toRowType());"* only > adds a type information hint about the return type of this operator. It is > used in cases where Flink cannot determine automatically[1]. The reason why I specify *".returns(sourceTable.getSchema().toRowType());"* is to give a type information hint as you said. That is needed later when I need to make another table like "*Table anotherTable = tEnv.fromDataStream(stream);"*, Without the type information hint, I've got an error "*An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."* That's why I give a type information hint in that way. Best, Dongwon
Re: [Table API] ClassCastException when converting a table to DataStream
Hi Dongwon, regarding the question about the conversion: If you keep using the Row type and not adding/removing fields, the conversion is pretty much for free right now. It will be a MapFunction (sometimes even not function at all) that should be chained with the other operators. Hence, it should boil down to a function call. Best, Fabian Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim < eastcirc...@gmail.com>: > Hi Rong, > > I have to dig deeper into the code to reproduce this error. This seems to >> be a bug to me and will update once I find anything. > > Thanks a lot for spending your time on this. > > However from what you explained, if I understand correctly you can do all >> of your processing within the TableAPI scope without converting it back and >> forth to DataStream. >> E.g. if your "map(a -> a)" placeholder represents some sort of map >> function that's simple enough, you can implement and connect with the table >> API via UserDefinedFunction[1]. >> As TableAPI becoming the first class citizen [2,3,4], this would be much >> cleaner implementation from my perspective. > > I also agree with you in that the first class citizen Table API will make > everything not only easier but also a lot cleaner. > We however contain some corner cases that force us to covert Table from > and to DataStream. > One such case is to append to Table a column showing the current watermark > of each record; there's no other way but to do that as ScalarFunction > doesn't allow us to get the runtime context information as ProcessFunction > does. > > I have a question regarding the conversion. > Do I have to worry about runtime performance penalty in case that I cannot > help but convert back and fourth to DataStream? > > Best, > > Dongwon > > On Sat, Jul 20, 2019 at 12:41 AM Rong Rong wrote: > >> Hi Dongwon, >> >> I have to dig deeper into the code to reproduce this error. This seems to >> be a bug to me and will update once I find anything. >> >> However from what you explained, if I understand correctly you can do all >> of your processing within the TableAPI scope without converting it back and >> forth to DataStream. >> E.g. if your "map(a -> a)" placeholder represents some sort of map >> function that's simple enough, you can implement and connect with the table >> API via UserDefinedFunction[1]. >> As TableAPI becoming the first class citizen [2,3,4], this would be much >> cleaner implementation from my perspective. >> >> -- >> Rong >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions >> [2] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html >> [3] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html >> [4] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html >> >> >> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim >> wrote: >> >>> Hi Rong, >>> >>> Thank you for reply :-) >>> >>> which Flink version are you using? >>> >>> I'm using Flink-1.8.0. >>> >>> what is the "sourceTable.getSchema().toRowType()" return? >>> >>> Row(time1: TimeIndicatorTypeInfo(rowtime)) >>> >>> what is the line *".map(a -> a)" *do and can you remove it? >>> >>> *".map(a->a)"* is just to illustrate a problem. >>> My actual code contains a process function (instead of .map() in the >>> snippet) which appends a new field containing watermark to a row. >>> If there were ways to get watermark inside a scalar UDF, I wouldn't >>> convert table to datastream and vice versa. >>> >>> if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well? >>> >>> yup :-) >>> >>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. >>> >>> The reason why I specify >>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type >>> information hint as you said. >>> That is needed later when I need to make another table like >>>"*Table anotherTable = tEnv.fromDataStream(stream);"*, >>> Without the type information hint, I've got an error >>>"*An input of GenericTypeInfo cannot be converted to Table. >>> Please specify the type of the input with a RowTypeInfo."* >>> That's why I give a type information hint in that way. >>> >>> Best, >>> >>> Dongwon >>> >>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong wrote: >>> Hi Dongwon, Can you provide a bit more information: which Flink version are you using? what is the "sourceTable.getSchema().toRowType()" return? what is the line *".map(a -> a)" *do and can you remove it? if I am understanding correctly, you are also using "time1" as the rowtime, is
Re: [Table API] ClassCastException when converting a table to DataStream
Hi Rong, I have to dig deeper into the code to reproduce this error. This seems to > be a bug to me and will update once I find anything. Thanks a lot for spending your time on this. However from what you explained, if I understand correctly you can do all > of your processing within the TableAPI scope without converting it back and > forth to DataStream. > E.g. if your "map(a -> a)" placeholder represents some sort of map > function that's simple enough, you can implement and connect with the table > API via UserDefinedFunction[1]. > As TableAPI becoming the first class citizen [2,3,4], this would be much > cleaner implementation from my perspective. I also agree with you in that the first class citizen Table API will make everything not only easier but also a lot cleaner. We however contain some corner cases that force us to covert Table from and to DataStream. One such case is to append to Table a column showing the current watermark of each record; there's no other way but to do that as ScalarFunction doesn't allow us to get the runtime context information as ProcessFunction does. I have a question regarding the conversion. Do I have to worry about runtime performance penalty in case that I cannot help but convert back and fourth to DataStream? Best, Dongwon On Sat, Jul 20, 2019 at 12:41 AM Rong Rong wrote: > Hi Dongwon, > > I have to dig deeper into the code to reproduce this error. This seems to > be a bug to me and will update once I find anything. > > However from what you explained, if I understand correctly you can do all > of your processing within the TableAPI scope without converting it back and > forth to DataStream. > E.g. if your "map(a -> a)" placeholder represents some sort of map > function that's simple enough, you can implement and connect with the table > API via UserDefinedFunction[1]. > As TableAPI becoming the first class citizen [2,3,4], this would be much > cleaner implementation from my perspective. > > -- > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions > [2] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html > [3] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html > [4] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html > > > On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim wrote: > >> Hi Rong, >> >> Thank you for reply :-) >> >> which Flink version are you using? >> >> I'm using Flink-1.8.0. >> >> what is the "sourceTable.getSchema().toRowType()" return? >> >> Row(time1: TimeIndicatorTypeInfo(rowtime)) >> >> what is the line *".map(a -> a)" *do and can you remove it? >> >> *".map(a->a)"* is just to illustrate a problem. >> My actual code contains a process function (instead of .map() in the >> snippet) which appends a new field containing watermark to a row. >> If there were ways to get watermark inside a scalar UDF, I wouldn't >> convert table to datastream and vice versa. >> >> if I am understanding correctly, you are also using "time1" as the >>> rowtime, is that want your intension is to use it later as well? >> >> yup :-) >> >> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >>> adds a type information hint about the return type of this operator. It is >>> used in cases where Flink cannot determine automatically[1]. >> >> The reason why I specify >> *".returns(sourceTable.getSchema().toRowType());"* is to give a type >> information hint as you said. >> That is needed later when I need to make another table like >>"*Table anotherTable = tEnv.fromDataStream(stream);"*, >> Without the type information hint, I've got an error >>"*An input of GenericTypeInfo cannot be converted to Table. >> Please specify the type of the input with a RowTypeInfo."* >> That's why I give a type information hint in that way. >> >> Best, >> >> Dongwon >> >> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong wrote: >> >>> Hi Dongwon, >>> >>> Can you provide a bit more information: >>> which Flink version are you using? >>> what is the "sourceTable.getSchema().toRowType()" return? >>> what is the line *".map(a -> a)" *do and can you remove it? >>> if I am understanding correctly, you are also using "time1" as the >>> rowtime, is that want your intension is to use it later as well? >>> >>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* >>> only adds a type information hint about the return type of this operator. >>> It is used in cases where Flink cannot determine automatically[1]. >>> >>> Thanks, >>> Rong >>> >>> -- >>> [1] >>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 >>> >>> >>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim >>> wrote: >>>
Re: [Table API] ClassCastException when converting a table to DataStream
Hi Dongwon, I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything. However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream. E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1]. As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. -- Rong [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions [2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html [3] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html [4] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim wrote: > Hi Rong, > > Thank you for reply :-) > > which Flink version are you using? > > I'm using Flink-1.8.0. > > what is the "sourceTable.getSchema().toRowType()" return? > > Row(time1: TimeIndicatorTypeInfo(rowtime)) > > what is the line *".map(a -> a)" *do and can you remove it? > > *".map(a->a)"* is just to illustrate a problem. > My actual code contains a process function (instead of .map() in the > snippet) which appends a new field containing watermark to a row. > If there were ways to get watermark inside a scalar UDF, I wouldn't > convert table to datastream and vice versa. > > if I am understanding correctly, you are also using "time1" as the >> rowtime, is that want your intension is to use it later as well? > > yup :-) > > As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >> adds a type information hint about the return type of this operator. It is >> used in cases where Flink cannot determine automatically[1]. > > The reason why I specify > *".returns(sourceTable.getSchema().toRowType());"* is to give a type > information hint as you said. > That is needed later when I need to make another table like >"*Table anotherTable = tEnv.fromDataStream(stream);"*, > Without the type information hint, I've got an error >"*An input of GenericTypeInfo cannot be converted to Table. > Please specify the type of the input with a RowTypeInfo."* > That's why I give a type information hint in that way. > > Best, > > Dongwon > > On Fri, Jul 19, 2019 at 12:39 AM Rong Rong wrote: > >> Hi Dongwon, >> >> Can you provide a bit more information: >> which Flink version are you using? >> what is the "sourceTable.getSchema().toRowType()" return? >> what is the line *".map(a -> a)" *do and can you remove it? >> if I am understanding correctly, you are also using "time1" as the >> rowtime, is that want your intension is to use it later as well? >> >> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >> adds a type information hint about the return type of this operator. It is >> used in cases where Flink cannot determine automatically[1]. >> >> Thanks, >> Rong >> >> -- >> [1] >> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 >> >> >> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim >> wrote: >> >>> Hello, >>> >>> Consider the following snippet: >>> Table sourceTable = getKafkaSource0(tEnv); DataStream stream = tEnv.toAppendStream(sourceTable, Row.class) * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* stream.print(); >>> where sourceTable.printSchema() shows: >>> root |-- time1: TimeIndicatorTypeInfo(rowtime) >>> >>> >>> >>> This program returns the following exception: >>> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) at app.metatron.test.Main2.main(Main2.java:231) *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long* * at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) at
Re: [Table API] ClassCastException when converting a table to DataStream
Hi Rong, Thank you for reply :-) which Flink version are you using? I'm using Flink-1.8.0. what is the "sourceTable.getSchema().toRowType()" return? Row(time1: TimeIndicatorTypeInfo(rowtime)) what is the line *".map(a -> a)" *do and can you remove it? *".map(a->a)"* is just to illustrate a problem. My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row. If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa. if I am understanding correctly, you are also using "time1" as the rowtime, > is that want your intension is to use it later as well? yup :-) As far as I know *".returns(sourceTable.getSchema().toRowType());"* only > adds a type information hint about the return type of this operator. It is > used in cases where Flink cannot determine automatically[1]. The reason why I specify *".returns(sourceTable.getSchema().toRowType());"* is to give a type information hint as you said. That is needed later when I need to make another table like "*Table anotherTable = tEnv.fromDataStream(stream);"*, Without the type information hint, I've got an error "*An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."* That's why I give a type information hint in that way. Best, Dongwon On Fri, Jul 19, 2019 at 12:39 AM Rong Rong wrote: > Hi Dongwon, > > Can you provide a bit more information: > which Flink version are you using? > what is the "sourceTable.getSchema().toRowType()" return? > what is the line *".map(a -> a)" *do and can you remove it? > if I am understanding correctly, you are also using "time1" as the > rowtime, is that want your intension is to use it later as well? > > As far as I know *".returns(sourceTable.getSchema().toRowType());"* only > adds a type information hint about the return type of this operator. It is > used in cases where Flink cannot determine automatically[1]. > > Thanks, > Rong > > -- > [1] > https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 > > > On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim wrote: > >> Hello, >> >> Consider the following snippet: >> >>> Table sourceTable = getKafkaSource0(tEnv); >>> DataStream stream = tEnv.toAppendStream(sourceTable, Row.class) >>> >>> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* >>> stream.print(); >>> >> where sourceTable.printSchema() shows: >> >>> root >>> |-- time1: TimeIndicatorTypeInfo(rowtime) >> >> >> >> This program returns the following exception: >> >>> Exception in thread "main" >>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >>> at >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>> at >>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >>> at >>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >>> at app.metatron.test.Main2.main(Main2.java:231) >>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be >>> cast to java.lang.Long* >>> * at >>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >>> ... >> >> >> The row serializer seems to try to deep-copy an instance of >> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. >> Could anybody help me? >> >> Best, >> >> - Dongwon >> >> p.s. though removing .returns() makes everything okay, I need to do that >> as I want to convert DataStream into another table later. >> p.s. the source table is created as follows: >> >> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) { >>> ConnectorDescriptor connectorDescriptor = new Kafka() >>> .version("universal") >>> .topic("mytopic") >>> .property("bootstrap.servers", "localhost:9092") >>> .property("group.id", "mygroup") >>> .startFromEarliest(); >>> FormatDescriptor formatDescriptor = new Csv() >>> .deriveSchema() >>> .ignoreParseErrors() >>> .fieldDelimiter(','); >>> Schema schemaDescriptor = new Schema() >>> .field("time1", SQL_TIMESTAMP()) >>> .rowtime( >>> new Rowtime() >>> .timestampsFromField("rowTime") >>> .watermarksPeriodicBounded(100) >>> ); >>> tEnv.connect(connectorDescriptor) >>> .withFormat(formatDescriptor) >>> .withSchema(schemaDescriptor) >>> .inAppendMode()
Re: [Table API] ClassCastException when converting a table to DataStream
Hi Dongwon, Can you provide a bit more information: which Flink version are you using? what is the "sourceTable.getSchema().toRowType()" return? what is the line *".map(a -> a)" *do and can you remove it? if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well? As far as I know *".returns(sourceTable.getSchema().toRowType());"* only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. Thanks, Rong -- [1] https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim wrote: > Hello, > > Consider the following snippet: > >> Table sourceTable = getKafkaSource0(tEnv); >> DataStream stream = tEnv.toAppendStream(sourceTable, Row.class) >> >> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* >> stream.print(); >> > where sourceTable.printSchema() shows: > >> root >> |-- time1: TimeIndicatorTypeInfo(rowtime) > > > > This program returns the following exception: > >> Exception in thread "main" >> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >> at >> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >> at >> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >> at >> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >> at app.metatron.test.Main2.main(Main2.java:231) >> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be >> cast to java.lang.Long* >> * at >> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >> at >> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >> at >> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >> ... > > > The row serializer seems to try to deep-copy an instance of > java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. > Could anybody help me? > > Best, > > - Dongwon > > p.s. though removing .returns() makes everything okay, I need to do that > as I want to convert DataStream into another table later. > p.s. the source table is created as follows: > > private static final Table getKafkaSource0(StreamTableEnvironment tEnv) { >> ConnectorDescriptor connectorDescriptor = new Kafka() >> .version("universal") >> .topic("mytopic") >> .property("bootstrap.servers", "localhost:9092") >> .property("group.id", "mygroup") >> .startFromEarliest(); >> FormatDescriptor formatDescriptor = new Csv() >> .deriveSchema() >> .ignoreParseErrors() >> .fieldDelimiter(','); >> Schema schemaDescriptor = new Schema() >> .field("time1", SQL_TIMESTAMP()) >> .rowtime( >> new Rowtime() >> .timestampsFromField("rowTime") >> .watermarksPeriodicBounded(100) >> ); >> tEnv.connect(connectorDescriptor) >> .withFormat(formatDescriptor) >> .withSchema(schemaDescriptor) >> .inAppendMode() >> .registerTableSource("mysrc"); >> return tEnv.scan("mysrc"); >> } > >
[Table API] ClassCastException when converting a table to DataStream
Hello, Consider the following snippet: > Table sourceTable = getKafkaSource0(tEnv); > DataStream stream = tEnv.toAppendStream(sourceTable, Row.class) > > * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* > stream.print(); > where sourceTable.printSchema() shows: > root > |-- time1: TimeIndicatorTypeInfo(rowtime) This program returns the following exception: > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) > at app.metatron.test.Main2.main(Main2.java:231) > *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be > cast to java.lang.Long* > * at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) > ... The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. Could anybody help me? Best, - Dongwon p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream into another table later. p.s. the source table is created as follows: private static final Table getKafkaSource0(StreamTableEnvironment tEnv) { > ConnectorDescriptor connectorDescriptor = new Kafka() > .version("universal") > .topic("mytopic") > .property("bootstrap.servers", "localhost:9092") > .property("group.id", "mygroup") > .startFromEarliest(); > FormatDescriptor formatDescriptor = new Csv() > .deriveSchema() > .ignoreParseErrors() > .fieldDelimiter(','); > Schema schemaDescriptor = new Schema() > .field("time1", SQL_TIMESTAMP()) > .rowtime( > new Rowtime() > .timestampsFromField("rowTime") > .watermarksPeriodicBounded(100) > ); > tEnv.connect(connectorDescriptor) > .withFormat(formatDescriptor) > .withSchema(schemaDescriptor) > .inAppendMode() > .registerTableSource("mysrc"); > return tEnv.scan("mysrc"); > }