Hi Fabian,

Thanks for clarification :-)
I could convert back and forth without worrying about it as I keep using
Row type during the conversion (even though fields are added).

Best,

Dongwon



On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Dongwon,
>
> regarding the question about the conversion: If you keep using the Row
> type and not adding/removing fields, the conversion is pretty much for free
> right now.
> It will be a MapFunction (sometimes even not function at all) that should
> be chained with the other operators. Hence, it should boil down to a
> function call.
>
> Best, Fabian
>
> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
> eastcirc...@gmail.com>:
>
>> Hi Rong,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>>> be a bug to me and will update once I find anything.
>>
>> Thanks a lot for spending your time on this.
>>
>> However from what you explained, if I understand correctly you can do all
>>> of your processing within the TableAPI scope without converting it back and
>>> forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>
>> I also agree with you in that the first class citizen Table API will make
>> everything not only easier but also a lot cleaner.
>> We however contain some corner cases that force us to covert Table from
>> and to DataStream.
>> One such case is to append to Table a column showing the current
>> watermark of each record; there's no other way but to do that as
>> ScalarFunction doesn't allow us to get the runtime context information as
>> ProcessFunction does.
>>
>> I have a question regarding the conversion.
>> Do I have to worry about runtime performance penalty in case that I
>> cannot help but convert back and fourth to DataStream?
>>
>> Best,
>>
>> Dongwon
>>
>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <walter...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>> to be a bug to me and will update once I find anything.
>>>
>>> However from what you explained, if I understand correctly you can do
>>> all of your processing within the TableAPI scope without converting it back
>>> and forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>>
>>> --
>>> Rong
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>> [4]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>
>>>
>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com>
>>> wrote:
>>>
>>>> Hi Rong,
>>>>
>>>> Thank you for reply :-)
>>>>
>>>> which Flink version are you using?
>>>>
>>>> I'm using Flink-1.8.0.
>>>>
>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>
>>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>>
>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>
>>>> *".map(a->a)"* is just to illustrate a problem.
>>>> My actual code contains a process function (instead of .map() in the
>>>> snippet) which appends a new field containing watermark to a row.
>>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>>> convert table to datastream and vice versa.
>>>>
>>>> if I am understanding correctly, you are also using "time1" as the
>>>>> rowtime, is that want your intension is to use it later as well?
>>>>
>>>> yup :-)
>>>>
>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>>> adds a type information hint about the return type of this operator. It is
>>>>> used in cases where Flink cannot determine automatically[1].
>>>>
>>>> The reason why I specify
>>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>>> information hint as you said.
>>>> That is needed later when I need to make another table like
>>>>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>>> Without the type information hint, I've got an error
>>>>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>>> Please specify the type of the input with a RowTypeInfo."*
>>>> That's why I give a type information hint in that way.
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote:
>>>>
>>>>> Hi Dongwon,
>>>>>
>>>>> Can you provide a bit more information:
>>>>> which Flink version are you using?
>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>
>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>>> only adds a type information hint about the return type of this operator.
>>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>>
>>>>> Thanks,
>>>>> Rong
>>>>>
>>>>> --
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>>
>>>>>
>>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Consider the following snippet:
>>>>>>
>>>>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>>> Row.class)
>>>>>>>
>>>>>>> *      .map(a -> a)
>>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>>>     stream.print();
>>>>>>>
>>>>>> where sourceTable.printSchema() shows:
>>>>>>
>>>>>>> root
>>>>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>>
>>>>>>
>>>>>>
>>>>>>  This program returns the following exception:
>>>>>>
>>>>>>> Exception in thread "main"
>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>>>>> failed.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot
>>>>>>> be cast to java.lang.Long*
>>>>>>> * at
>>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>>> ...
>>>>>>
>>>>>>
>>>>>> The row serializer seems to try to deep-copy an instance of
>>>>>> java.sql.Timestamp using LongSerializer instead of 
>>>>>> SqlTimestampSerializer.
>>>>>> Could anybody help me?
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> - Dongwon
>>>>>>
>>>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>>>> that as I want to convert DataStream<Row> into another table later.
>>>>>> p.s. the source table is created as follows:
>>>>>>
>>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>>> tEnv) {
>>>>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>>>       .version("universal")
>>>>>>>       .topic("mytopic")
>>>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>>>       .property("group.id", "mygroup")
>>>>>>>       .startFromEarliest();
>>>>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>>>>       .deriveSchema()
>>>>>>>       .ignoreParseErrors()
>>>>>>>       .fieldDelimiter(',');
>>>>>>>     Schema schemaDescriptor = new Schema()
>>>>>>>       .field("time1", SQL_TIMESTAMP())
>>>>>>>       .rowtime(
>>>>>>>         new Rowtime()
>>>>>>>           .timestampsFromField("rowTime")
>>>>>>>           .watermarksPeriodicBounded(100)
>>>>>>>       );
>>>>>>>     tEnv.connect(connectorDescriptor)
>>>>>>>       .withFormat(formatDescriptor)
>>>>>>>       .withSchema(schemaDescriptor)
>>>>>>>       .inAppendMode()
>>>>>>>       .registerTableSource("mysrc");
>>>>>>>     return tEnv.scan("mysrc");
>>>>>>>   }
>>>>>>
>>>>>>

Reply via email to