Hi Dongwon,

regarding the question about the conversion: If you keep using the Row type
and not adding/removing fields, the conversion is pretty much for free
right now.
It will be a MapFunction (sometimes even not function at all) that should
be chained with the other operators. Hence, it should boil down to a
function call.

Best, Fabian

Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
eastcirc...@gmail.com>:

> Hi Rong,
>
> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>
> Thanks a lot for spending your time on this.
>
> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>
> I also agree with you in that the first class citizen Table API will make
> everything not only easier but also a lot cleaner.
> We however contain some corner cases that force us to covert Table from
> and to DataStream.
> One such case is to append to Table a column showing the current watermark
> of each record; there's no other way but to do that as ScalarFunction
> doesn't allow us to get the runtime context information as ProcessFunction
> does.
>
> I have a question regarding the conversion.
> Do I have to worry about runtime performance penalty in case that I cannot
> help but convert back and fourth to DataStream?
>
> Best,
>
> Dongwon
>
> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <walter...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>>
>> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>>
>> --
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>> [4]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>
>>
>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi Rong,
>>>
>>> Thank you for reply :-)
>>>
>>> which Flink version are you using?
>>>
>>> I'm using Flink-1.8.0.
>>>
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>
>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>
>>> *".map(a->a)"* is just to illustrate a problem.
>>> My actual code contains a process function (instead of .map() in the
>>> snippet) which appends a new field containing watermark to a row.
>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>> convert table to datastream and vice versa.
>>>
>>> if I am understanding correctly, you are also using "time1" as the
>>>> rowtime, is that want your intension is to use it later as well?
>>>
>>> yup :-)
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>> adds a type information hint about the return type of this operator. It is
>>>> used in cases where Flink cannot determine automatically[1].
>>>
>>> The reason why I specify
>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>> information hint as you said.
>>> That is needed later when I need to make another table like
>>>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>> Without the type information hint, I've got an error
>>>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>> Please specify the type of the input with a RowTypeInfo."*
>>> That's why I give a type information hint in that way.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> Can you provide a bit more information:
>>>> which Flink version are you using?
>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>> if I am understanding correctly, you are also using "time1" as the
>>>> rowtime, is that want your intension is to use it later as well?
>>>>
>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>> only adds a type information hint about the return type of this operator.
>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>
>>>> Thanks,
>>>> Rong
>>>>
>>>> --
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>
>>>>
>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Consider the following snippet:
>>>>>
>>>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>> Row.class)
>>>>>>
>>>>>> *      .map(a -> a)
>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>>     stream.print();
>>>>>>
>>>>> where sourceTable.printSchema() shows:
>>>>>
>>>>>> root
>>>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>
>>>>>
>>>>>
>>>>>  This program returns the following exception:
>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>>>> failed.
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>> at
>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot
>>>>>> be cast to java.lang.Long*
>>>>>> * at
>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>> ...
>>>>>
>>>>>
>>>>> The row serializer seems to try to deep-copy an instance of
>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>>> Could anybody help me?
>>>>>
>>>>> Best,
>>>>>
>>>>> - Dongwon
>>>>>
>>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>>> that as I want to convert DataStream<Row> into another table later.
>>>>> p.s. the source table is created as follows:
>>>>>
>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>> tEnv) {
>>>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>>       .version("universal")
>>>>>>       .topic("mytopic")
>>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>>       .property("group.id", "mygroup")
>>>>>>       .startFromEarliest();
>>>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>>>       .deriveSchema()
>>>>>>       .ignoreParseErrors()
>>>>>>       .fieldDelimiter(',');
>>>>>>     Schema schemaDescriptor = new Schema()
>>>>>>       .field("time1", SQL_TIMESTAMP())
>>>>>>       .rowtime(
>>>>>>         new Rowtime()
>>>>>>           .timestampsFromField("rowTime")
>>>>>>           .watermarksPeriodicBounded(100)
>>>>>>       );
>>>>>>     tEnv.connect(connectorDescriptor)
>>>>>>       .withFormat(formatDescriptor)
>>>>>>       .withSchema(schemaDescriptor)
>>>>>>       .inAppendMode()
>>>>>>       .registerTableSource("mysrc");
>>>>>>     return tEnv.scan("mysrc");
>>>>>>   }
>>>>>
>>>>>

Reply via email to