Hi Fabian, Thanks for clarification :-) I could convert back and forth without worrying about it as I keep using Row type during the conversion (even though fields are added).
Best, Dongwon On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Dongwon, > > regarding the question about the conversion: If you keep using the Row > type and not adding/removing fields, the conversion is pretty much for free > right now. > It will be a MapFunction (sometimes even not function at all) that should > be chained with the other operators. Hence, it should boil down to a > function call. > > Best, Fabian > > Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim < > eastcirc...@gmail.com>: > >> Hi Rong, >> >> I have to dig deeper into the code to reproduce this error. This seems to >>> be a bug to me and will update once I find anything. >> >> Thanks a lot for spending your time on this. >> >> However from what you explained, if I understand correctly you can do all >>> of your processing within the TableAPI scope without converting it back and >>> forth to DataStream. >>> E.g. if your "map(a -> a)" placeholder represents some sort of map >>> function that's simple enough, you can implement and connect with the table >>> API via UserDefinedFunction[1]. >>> As TableAPI becoming the first class citizen [2,3,4], this would be much >>> cleaner implementation from my perspective. >> >> I also agree with you in that the first class citizen Table API will make >> everything not only easier but also a lot cleaner. >> We however contain some corner cases that force us to covert Table from >> and to DataStream. >> One such case is to append to Table a column showing the current >> watermark of each record; there's no other way but to do that as >> ScalarFunction doesn't allow us to get the runtime context information as >> ProcessFunction does. >> >> I have a question regarding the conversion. >> Do I have to worry about runtime performance penalty in case that I >> cannot help but convert back and fourth to DataStream? >> >> Best, >> >> Dongwon >> >> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <walter...@gmail.com> wrote: >> >>> Hi Dongwon, >>> >>> I have to dig deeper into the code to reproduce this error. This seems >>> to be a bug to me and will update once I find anything. >>> >>> However from what you explained, if I understand correctly you can do >>> all of your processing within the TableAPI scope without converting it back >>> and forth to DataStream. >>> E.g. if your "map(a -> a)" placeholder represents some sort of map >>> function that's simple enough, you can implement and connect with the table >>> API via UserDefinedFunction[1]. >>> As TableAPI becoming the first class citizen [2,3,4], this would be much >>> cleaner implementation from my perspective. >>> >>> -- >>> Rong >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions >>> [2] >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html >>> [3] >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html >>> [4] >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html >>> >>> >>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com> >>> wrote: >>> >>>> Hi Rong, >>>> >>>> Thank you for reply :-) >>>> >>>> which Flink version are you using? >>>> >>>> I'm using Flink-1.8.0. >>>> >>>> what is the "sourceTable.getSchema().toRowType()" return? >>>> >>>> Row(time1: TimeIndicatorTypeInfo(rowtime)) >>>> >>>> what is the line *".map(a -> a)" *do and can you remove it? >>>> >>>> *".map(a->a)"* is just to illustrate a problem. >>>> My actual code contains a process function (instead of .map() in the >>>> snippet) which appends a new field containing watermark to a row. >>>> If there were ways to get watermark inside a scalar UDF, I wouldn't >>>> convert table to datastream and vice versa. >>>> >>>> if I am understanding correctly, you are also using "time1" as the >>>>> rowtime, is that want your intension is to use it later as well? >>>> >>>> yup :-) >>>> >>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >>>>> adds a type information hint about the return type of this operator. It is >>>>> used in cases where Flink cannot determine automatically[1]. >>>> >>>> The reason why I specify >>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type >>>> information hint as you said. >>>> That is needed later when I need to make another table like >>>> "*Table anotherTable = tEnv.fromDataStream(stream);"*, >>>> Without the type information hint, I've got an error >>>> "*An input of GenericTypeInfo<Row> cannot be converted to Table. >>>> Please specify the type of the input with a RowTypeInfo."* >>>> That's why I give a type information hint in that way. >>>> >>>> Best, >>>> >>>> Dongwon >>>> >>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote: >>>> >>>>> Hi Dongwon, >>>>> >>>>> Can you provide a bit more information: >>>>> which Flink version are you using? >>>>> what is the "sourceTable.getSchema().toRowType()" return? >>>>> what is the line *".map(a -> a)" *do and can you remove it? >>>>> if I am understanding correctly, you are also using "time1" as the >>>>> rowtime, is that want your intension is to use it later as well? >>>>> >>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* >>>>> only adds a type information hint about the return type of this operator. >>>>> It is used in cases where Flink cannot determine automatically[1]. >>>>> >>>>> Thanks, >>>>> Rong >>>>> >>>>> -- >>>>> [1] >>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 >>>>> >>>>> >>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> Consider the following snippet: >>>>>> >>>>>>> Table sourceTable = getKafkaSource0(tEnv); >>>>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, >>>>>>> Row.class) >>>>>>> >>>>>>> * .map(a -> a) >>>>>>> .returns(sourceTable.getSchema().toRowType());* >>>>>>> stream.print(); >>>>>>> >>>>>> where sourceTable.printSchema() shows: >>>>>> >>>>>>> root >>>>>>> |-- time1: TimeIndicatorTypeInfo(rowtime) >>>>>> >>>>>> >>>>>> >>>>>> This program returns the following exception: >>>>>> >>>>>>> Exception in thread "main" >>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>>>>> failed. >>>>>>> at >>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>>>> at >>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >>>>>>> at app.metatron.test.Main2.main(Main2.java:231) >>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot >>>>>>> be cast to java.lang.Long* >>>>>>> * at >>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >>>>>>> ... >>>>>> >>>>>> >>>>>> The row serializer seems to try to deep-copy an instance of >>>>>> java.sql.Timestamp using LongSerializer instead of >>>>>> SqlTimestampSerializer. >>>>>> Could anybody help me? >>>>>> >>>>>> Best, >>>>>> >>>>>> - Dongwon >>>>>> >>>>>> p.s. though removing .returns() makes everything okay, I need to do >>>>>> that as I want to convert DataStream<Row> into another table later. >>>>>> p.s. the source table is created as follows: >>>>>> >>>>>> private static final Table getKafkaSource0(StreamTableEnvironment >>>>>>> tEnv) { >>>>>>> ConnectorDescriptor connectorDescriptor = new Kafka() >>>>>>> .version("universal") >>>>>>> .topic("mytopic") >>>>>>> .property("bootstrap.servers", "localhost:9092") >>>>>>> .property("group.id", "mygroup") >>>>>>> .startFromEarliest(); >>>>>>> FormatDescriptor formatDescriptor = new Csv() >>>>>>> .deriveSchema() >>>>>>> .ignoreParseErrors() >>>>>>> .fieldDelimiter(','); >>>>>>> Schema schemaDescriptor = new Schema() >>>>>>> .field("time1", SQL_TIMESTAMP()) >>>>>>> .rowtime( >>>>>>> new Rowtime() >>>>>>> .timestampsFromField("rowTime") >>>>>>> .watermarksPeriodicBounded(100) >>>>>>> ); >>>>>>> tEnv.connect(connectorDescriptor) >>>>>>> .withFormat(formatDescriptor) >>>>>>> .withSchema(schemaDescriptor) >>>>>>> .inAppendMode() >>>>>>> .registerTableSource("mysrc"); >>>>>>> return tEnv.scan("mysrc"); >>>>>>> } >>>>>> >>>>>>