Hi Dongwon, regarding the question about the conversion: If you keep using the Row type and not adding/removing fields, the conversion is pretty much for free right now. It will be a MapFunction (sometimes even not function at all) that should be chained with the other operators. Hence, it should boil down to a function call.
Best, Fabian Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim < eastcirc...@gmail.com>: > Hi Rong, > > I have to dig deeper into the code to reproduce this error. This seems to >> be a bug to me and will update once I find anything. > > Thanks a lot for spending your time on this. > > However from what you explained, if I understand correctly you can do all >> of your processing within the TableAPI scope without converting it back and >> forth to DataStream. >> E.g. if your "map(a -> a)" placeholder represents some sort of map >> function that's simple enough, you can implement and connect with the table >> API via UserDefinedFunction[1]. >> As TableAPI becoming the first class citizen [2,3,4], this would be much >> cleaner implementation from my perspective. > > I also agree with you in that the first class citizen Table API will make > everything not only easier but also a lot cleaner. > We however contain some corner cases that force us to covert Table from > and to DataStream. > One such case is to append to Table a column showing the current watermark > of each record; there's no other way but to do that as ScalarFunction > doesn't allow us to get the runtime context information as ProcessFunction > does. > > I have a question regarding the conversion. > Do I have to worry about runtime performance penalty in case that I cannot > help but convert back and fourth to DataStream? > > Best, > > Dongwon > > On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <walter...@gmail.com> wrote: > >> Hi Dongwon, >> >> I have to dig deeper into the code to reproduce this error. This seems to >> be a bug to me and will update once I find anything. >> >> However from what you explained, if I understand correctly you can do all >> of your processing within the TableAPI scope without converting it back and >> forth to DataStream. >> E.g. if your "map(a -> a)" placeholder represents some sort of map >> function that's simple enough, you can implement and connect with the table >> API via UserDefinedFunction[1]. >> As TableAPI becoming the first class citizen [2,3,4], this would be much >> cleaner implementation from my perspective. >> >> -- >> Rong >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions >> [2] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html >> [3] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html >> [4] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html >> >> >> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi Rong, >>> >>> Thank you for reply :-) >>> >>> which Flink version are you using? >>> >>> I'm using Flink-1.8.0. >>> >>> what is the "sourceTable.getSchema().toRowType()" return? >>> >>> Row(time1: TimeIndicatorTypeInfo(rowtime)) >>> >>> what is the line *".map(a -> a)" *do and can you remove it? >>> >>> *".map(a->a)"* is just to illustrate a problem. >>> My actual code contains a process function (instead of .map() in the >>> snippet) which appends a new field containing watermark to a row. >>> If there were ways to get watermark inside a scalar UDF, I wouldn't >>> convert table to datastream and vice versa. >>> >>> if I am understanding correctly, you are also using "time1" as the >>>> rowtime, is that want your intension is to use it later as well? >>> >>> yup :-) >>> >>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >>>> adds a type information hint about the return type of this operator. It is >>>> used in cases where Flink cannot determine automatically[1]. >>> >>> The reason why I specify >>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type >>> information hint as you said. >>> That is needed later when I need to make another table like >>> "*Table anotherTable = tEnv.fromDataStream(stream);"*, >>> Without the type information hint, I've got an error >>> "*An input of GenericTypeInfo<Row> cannot be converted to Table. >>> Please specify the type of the input with a RowTypeInfo."* >>> That's why I give a type information hint in that way. >>> >>> Best, >>> >>> Dongwon >>> >>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote: >>> >>>> Hi Dongwon, >>>> >>>> Can you provide a bit more information: >>>> which Flink version are you using? >>>> what is the "sourceTable.getSchema().toRowType()" return? >>>> what is the line *".map(a -> a)" *do and can you remove it? >>>> if I am understanding correctly, you are also using "time1" as the >>>> rowtime, is that want your intension is to use it later as well? >>>> >>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* >>>> only adds a type information hint about the return type of this operator. >>>> It is used in cases where Flink cannot determine automatically[1]. >>>> >>>> Thanks, >>>> Rong >>>> >>>> -- >>>> [1] >>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 >>>> >>>> >>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> Consider the following snippet: >>>>> >>>>>> Table sourceTable = getKafkaSource0(tEnv); >>>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, >>>>>> Row.class) >>>>>> >>>>>> * .map(a -> a) >>>>>> .returns(sourceTable.getSchema().toRowType());* >>>>>> stream.print(); >>>>>> >>>>> where sourceTable.printSchema() shows: >>>>> >>>>>> root >>>>>> |-- time1: TimeIndicatorTypeInfo(rowtime) >>>>> >>>>> >>>>> >>>>> This program returns the following exception: >>>>> >>>>>> Exception in thread "main" >>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>>>> failed. >>>>>> at >>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>>> at >>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >>>>>> at >>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>>>>> at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >>>>>> at app.metatron.test.Main2.main(Main2.java:231) >>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot >>>>>> be cast to java.lang.Long* >>>>>> * at >>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >>>>>> ... >>>>> >>>>> >>>>> The row serializer seems to try to deep-copy an instance of >>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. >>>>> Could anybody help me? >>>>> >>>>> Best, >>>>> >>>>> - Dongwon >>>>> >>>>> p.s. though removing .returns() makes everything okay, I need to do >>>>> that as I want to convert DataStream<Row> into another table later. >>>>> p.s. the source table is created as follows: >>>>> >>>>> private static final Table getKafkaSource0(StreamTableEnvironment >>>>>> tEnv) { >>>>>> ConnectorDescriptor connectorDescriptor = new Kafka() >>>>>> .version("universal") >>>>>> .topic("mytopic") >>>>>> .property("bootstrap.servers", "localhost:9092") >>>>>> .property("group.id", "mygroup") >>>>>> .startFromEarliest(); >>>>>> FormatDescriptor formatDescriptor = new Csv() >>>>>> .deriveSchema() >>>>>> .ignoreParseErrors() >>>>>> .fieldDelimiter(','); >>>>>> Schema schemaDescriptor = new Schema() >>>>>> .field("time1", SQL_TIMESTAMP()) >>>>>> .rowtime( >>>>>> new Rowtime() >>>>>> .timestampsFromField("rowTime") >>>>>> .watermarksPeriodicBounded(100) >>>>>> ); >>>>>> tEnv.connect(connectorDescriptor) >>>>>> .withFormat(formatDescriptor) >>>>>> .withSchema(schemaDescriptor) >>>>>> .inAppendMode() >>>>>> .registerTableSource("mysrc"); >>>>>> return tEnv.scan("mysrc"); >>>>>> } >>>>> >>>>>