Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-23 Thread Rong Rong
Hi Dongwon,

Sorry for the late reply. I did try some experiment and seems like you are
right:
Setting the `.return()` type actually alter the underlying type of the
DataStream from a GenericType into a specific RowTypeInfo. Please see the
JIRA ticket [1] for more info.

Regarding the approach, yes I think you cannot access the timer service
from the table/SQL API at this moment so that might be the best approach.
And as Fabian suggested, I don't think there's too much problem if you are
not changing the type info underlying in your DataStream. I will follow up
with this in the JIRA ticket.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-13389

On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim  wrote:

> Hi Fabian,
>
> Thanks for clarification :-)
> I could convert back and forth without worrying about it as I keep using
> Row type during the conversion (even though fields are added).
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske  wrote:
>
>> Hi Dongwon,
>>
>> regarding the question about the conversion: If you keep using the Row
>> type and not adding/removing fields, the conversion is pretty much for free
>> right now.
>> It will be a MapFunction (sometimes even not function at all) that should
>> be chained with the other operators. Hence, it should boil down to a
>> function call.
>>
>> Best, Fabian
>>
>> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
>> eastcirc...@gmail.com>:
>>
>>> Hi Rong,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
 to be a bug to me and will update once I find anything.
>>>
>>> Thanks a lot for spending your time on this.
>>>
>>> However from what you explained, if I understand correctly you can do
 all of your processing within the TableAPI scope without converting it back
 and forth to DataStream.
 E.g. if your "map(a -> a)" placeholder represents some sort of map
 function that's simple enough, you can implement and connect with the table
 API via UserDefinedFunction[1].
 As TableAPI becoming the first class citizen [2,3,4], this would be
 much cleaner implementation from my perspective.
>>>
>>> I also agree with you in that the first class citizen Table API will
>>> make everything not only easier but also a lot cleaner.
>>> We however contain some corner cases that force us to covert Table from
>>> and to DataStream.
>>> One such case is to append to Table a column showing the current
>>> watermark of each record; there's no other way but to do that as
>>> ScalarFunction doesn't allow us to get the runtime context information as
>>> ProcessFunction does.
>>>
>>> I have a question regarding the conversion.
>>> Do I have to worry about runtime performance penalty in case that I
>>> cannot help but convert back and fourth to DataStream?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong  wrote:
>>>
 Hi Dongwon,

 I have to dig deeper into the code to reproduce this error. This seems
 to be a bug to me and will update once I find anything.

 However from what you explained, if I understand correctly you can do
 all of your processing within the TableAPI scope without converting it back
 and forth to DataStream.
 E.g. if your "map(a -> a)" placeholder represents some sort of map
 function that's simple enough, you can implement and connect with the table
 API via UserDefinedFunction[1].
 As TableAPI becoming the first class citizen [2,3,4], this would be
 much cleaner implementation from my perspective.

 --
 Rong

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
 [2]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
 [3]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
 [4]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html


 On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim 
 wrote:

> Hi Rong,
>
> Thank you for reply :-)
>
> which Flink version are you using?
>
> I'm using Flink-1.8.0.
>
> what is the "sourceTable.getSchema().toRowType()" return?
>
> Row(time1: TimeIndicatorTypeInfo(rowtime))
>
> what is the line *".map(a -> a)" *do and can you remove it?
>
> *".map(a->a)"* is just to illustrate a problem.
> My actual code contains a process function (instead of .map() in the
> snippet) which appends a new field containing watermark to a row.
> If there were ways to get watermark inside a scalar UDF, I wouldn't
> convert table to datastream and vice versa.
>
> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is 

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-23 Thread Dongwon Kim
Hi Fabian,

Thanks for clarification :-)
I could convert back and forth without worrying about it as I keep using
Row type during the conversion (even though fields are added).

Best,

Dongwon



On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske  wrote:

> Hi Dongwon,
>
> regarding the question about the conversion: If you keep using the Row
> type and not adding/removing fields, the conversion is pretty much for free
> right now.
> It will be a MapFunction (sometimes even not function at all) that should
> be chained with the other operators. Hence, it should boil down to a
> function call.
>
> Best, Fabian
>
> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
> eastcirc...@gmail.com>:
>
>> Hi Rong,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>>> be a bug to me and will update once I find anything.
>>
>> Thanks a lot for spending your time on this.
>>
>> However from what you explained, if I understand correctly you can do all
>>> of your processing within the TableAPI scope without converting it back and
>>> forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>
>> I also agree with you in that the first class citizen Table API will make
>> everything not only easier but also a lot cleaner.
>> We however contain some corner cases that force us to covert Table from
>> and to DataStream.
>> One such case is to append to Table a column showing the current
>> watermark of each record; there's no other way but to do that as
>> ScalarFunction doesn't allow us to get the runtime context information as
>> ProcessFunction does.
>>
>> I have a question regarding the conversion.
>> Do I have to worry about runtime performance penalty in case that I
>> cannot help but convert back and fourth to DataStream?
>>
>> Best,
>>
>> Dongwon
>>
>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong  wrote:
>>
>>> Hi Dongwon,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>> to be a bug to me and will update once I find anything.
>>>
>>> However from what you explained, if I understand correctly you can do
>>> all of your processing within the TableAPI scope without converting it back
>>> and forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>>
>>> --
>>> Rong
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>> [4]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>
>>>
>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim 
>>> wrote:
>>>
 Hi Rong,

 Thank you for reply :-)

 which Flink version are you using?

 I'm using Flink-1.8.0.

 what is the "sourceTable.getSchema().toRowType()" return?

 Row(time1: TimeIndicatorTypeInfo(rowtime))

 what is the line *".map(a -> a)" *do and can you remove it?

 *".map(a->a)"* is just to illustrate a problem.
 My actual code contains a process function (instead of .map() in the
 snippet) which appends a new field containing watermark to a row.
 If there were ways to get watermark inside a scalar UDF, I wouldn't
 convert table to datastream and vice versa.

 if I am understanding correctly, you are also using "time1" as the
> rowtime, is that want your intension is to use it later as well?

 yup :-)

 As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].

 The reason why I specify
 *".returns(sourceTable.getSchema().toRowType());"* is to give a type
 information hint as you said.
 That is needed later when I need to make another table like
"*Table anotherTable = tEnv.fromDataStream(stream);"*,
 Without the type information hint, I've got an error
"*An input of GenericTypeInfo cannot be converted to Table.
 Please specify the type of the input with a RowTypeInfo."*
 That's why I give a type information hint in that way.

 Best,

 Dongwon


Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-23 Thread Fabian Hueske
Hi Dongwon,

regarding the question about the conversion: If you keep using the Row type
and not adding/removing fields, the conversion is pretty much for free
right now.
It will be a MapFunction (sometimes even not function at all) that should
be chained with the other operators. Hence, it should boil down to a
function call.

Best, Fabian

Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
eastcirc...@gmail.com>:

> Hi Rong,
>
> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>
> Thanks a lot for spending your time on this.
>
> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>
> I also agree with you in that the first class citizen Table API will make
> everything not only easier but also a lot cleaner.
> We however contain some corner cases that force us to covert Table from
> and to DataStream.
> One such case is to append to Table a column showing the current watermark
> of each record; there's no other way but to do that as ScalarFunction
> doesn't allow us to get the runtime context information as ProcessFunction
> does.
>
> I have a question regarding the conversion.
> Do I have to worry about runtime performance penalty in case that I cannot
> help but convert back and fourth to DataStream?
>
> Best,
>
> Dongwon
>
> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong  wrote:
>
>> Hi Dongwon,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>>
>> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>>
>> --
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>> [4]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>
>>
>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Rong,
>>>
>>> Thank you for reply :-)
>>>
>>> which Flink version are you using?
>>>
>>> I'm using Flink-1.8.0.
>>>
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>
>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>
>>> *".map(a->a)"* is just to illustrate a problem.
>>> My actual code contains a process function (instead of .map() in the
>>> snippet) which appends a new field containing watermark to a row.
>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>> convert table to datastream and vice versa.
>>>
>>> if I am understanding correctly, you are also using "time1" as the
 rowtime, is that want your intension is to use it later as well?
>>>
>>> yup :-)
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
 adds a type information hint about the return type of this operator. It is
 used in cases where Flink cannot determine automatically[1].
>>>
>>> The reason why I specify
>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>> information hint as you said.
>>> That is needed later when I need to make another table like
>>>"*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>> Without the type information hint, I've got an error
>>>"*An input of GenericTypeInfo cannot be converted to Table.
>>> Please specify the type of the input with a RowTypeInfo."*
>>> That's why I give a type information hint in that way.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong  wrote:
>>>
 Hi Dongwon,

 Can you provide a bit more information:
 which Flink version are you using?
 what is the "sourceTable.getSchema().toRowType()" return?
 what is the line *".map(a -> a)" *do and can you remove it?
 if I am understanding correctly, you are also using "time1" as the
 rowtime, is 

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-19 Thread Dongwon Kim
Hi Rong,

I have to dig deeper into the code to reproduce this error. This seems to
> be a bug to me and will update once I find anything.

Thanks a lot for spending your time on this.

However from what you explained, if I understand correctly you can do all
> of your processing within the TableAPI scope without converting it back and
> forth to DataStream.
> E.g. if your "map(a -> a)" placeholder represents some sort of map
> function that's simple enough, you can implement and connect with the table
> API via UserDefinedFunction[1].
> As TableAPI becoming the first class citizen [2,3,4], this would be much
> cleaner implementation from my perspective.

I also agree with you in that the first class citizen Table API will make
everything not only easier but also a lot cleaner.
We however contain some corner cases that force us to covert Table from and
to DataStream.
One such case is to append to Table a column showing the current watermark
of each record; there's no other way but to do that as ScalarFunction
doesn't allow us to get the runtime context information as ProcessFunction
does.

I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot
help but convert back and fourth to DataStream?

Best,

Dongwon

On Sat, Jul 20, 2019 at 12:41 AM Rong Rong  wrote:

> Hi Dongwon,
>
> I have to dig deeper into the code to reproduce this error. This seems to
> be a bug to me and will update once I find anything.
>
> However from what you explained, if I understand correctly you can do all
> of your processing within the TableAPI scope without converting it back and
> forth to DataStream.
> E.g. if your "map(a -> a)" placeholder represents some sort of map
> function that's simple enough, you can implement and connect with the table
> API via UserDefinedFunction[1].
> As TableAPI becoming the first class citizen [2,3,4], this would be much
> cleaner implementation from my perspective.
>
> --
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
> [4]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>
>
> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim  wrote:
>
>> Hi Rong,
>>
>> Thank you for reply :-)
>>
>> which Flink version are you using?
>>
>> I'm using Flink-1.8.0.
>>
>> what is the "sourceTable.getSchema().toRowType()" return?
>>
>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>
>> what is the line *".map(a -> a)" *do and can you remove it?
>>
>> *".map(a->a)"* is just to illustrate a problem.
>> My actual code contains a process function (instead of .map() in the
>> snippet) which appends a new field containing watermark to a row.
>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>> convert table to datastream and vice versa.
>>
>> if I am understanding correctly, you are also using "time1" as the
>>> rowtime, is that want your intension is to use it later as well?
>>
>> yup :-)
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>> adds a type information hint about the return type of this operator. It is
>>> used in cases where Flink cannot determine automatically[1].
>>
>> The reason why I specify
>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>> information hint as you said.
>> That is needed later when I need to make another table like
>>"*Table anotherTable = tEnv.fromDataStream(stream);"*,
>> Without the type information hint, I've got an error
>>"*An input of GenericTypeInfo cannot be converted to Table.
>> Please specify the type of the input with a RowTypeInfo."*
>> That's why I give a type information hint in that way.
>>
>> Best,
>>
>> Dongwon
>>
>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong  wrote:
>>
>>> Hi Dongwon,
>>>
>>> Can you provide a bit more information:
>>> which Flink version are you using?
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>> if I am understanding correctly, you are also using "time1" as the
>>> rowtime, is that want your intension is to use it later as well?
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>> only adds a type information hint about the return type of this operator.
>>> It is used in cases where Flink cannot determine automatically[1].
>>>
>>> Thanks,
>>> Rong
>>>
>>> --
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>
>>>
>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim 
>>> wrote:
>>>
 

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-19 Thread Rong Rong
Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to
be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all
of your processing within the TableAPI scope without converting it back and
forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function
that's simple enough, you can implement and connect with the table API via
UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much
cleaner implementation from my perspective.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html


On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim  wrote:

> Hi Rong,
>
> Thank you for reply :-)
>
> which Flink version are you using?
>
> I'm using Flink-1.8.0.
>
> what is the "sourceTable.getSchema().toRowType()" return?
>
> Row(time1: TimeIndicatorTypeInfo(rowtime))
>
> what is the line *".map(a -> a)" *do and can you remove it?
>
> *".map(a->a)"* is just to illustrate a problem.
> My actual code contains a process function (instead of .map() in the
> snippet) which appends a new field containing watermark to a row.
> If there were ways to get watermark inside a scalar UDF, I wouldn't
> convert table to datastream and vice versa.
>
> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>
> yup :-)
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>
> The reason why I specify
> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
> information hint as you said.
> That is needed later when I need to make another table like
>"*Table anotherTable = tEnv.fromDataStream(stream);"*,
> Without the type information hint, I've got an error
>"*An input of GenericTypeInfo cannot be converted to Table.
> Please specify the type of the input with a RowTypeInfo."*
> That's why I give a type information hint in that way.
>
> Best,
>
> Dongwon
>
> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong  wrote:
>
>> Hi Dongwon,
>>
>> Can you provide a bit more information:
>> which Flink version are you using?
>> what is the "sourceTable.getSchema().toRowType()" return?
>> what is the line *".map(a -> a)" *do and can you remove it?
>> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>>
>> Thanks,
>> Rong
>>
>> --
>> [1]
>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>
>>
>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim 
>> wrote:
>>
>>> Hello,
>>>
>>> Consider the following snippet:
>>>
 Table sourceTable = getKafkaSource0(tEnv);
 DataStream stream = tEnv.toAppendStream(sourceTable, Row.class)

 *  .map(a -> a)  .returns(sourceTable.getSchema().toRowType());*
 stream.print();

>>> where sourceTable.printSchema() shows:
>>>
 root
  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>
>>>
>>>
>>>  This program returns the following exception:
>>>
 Exception in thread "main"
 org.apache.flink.runtime.client.JobExecutionException: Job execution 
 failed.
 at
 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at
 org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
 at
 org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
 at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
 at app.metatron.test.Main2.main(Main2.java:231)
 *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
 cast to java.lang.Long*
 * at
 org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
 at
 org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
 at
 

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-18 Thread Dongwon Kim
Hi Rong,

Thank you for reply :-)

which Flink version are you using?

I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?

Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line *".map(a -> a)" *do and can you remove it?

*".map(a->a)"* is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the
snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert
table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime,
> is that want your intension is to use it later as well?

yup :-)

As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].

The reason why I specify *".returns(sourceTable.getSchema().toRowType());"* is
to give a type information hint as you said.
That is needed later when I need to make another table like
   "*Table anotherTable = tEnv.fromDataStream(stream);"*,
Without the type information hint, I've got an error
   "*An input of GenericTypeInfo cannot be converted to Table. Please
specify the type of the input with a RowTypeInfo."*
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong  wrote:

> Hi Dongwon,
>
> Can you provide a bit more information:
> which Flink version are you using?
> what is the "sourceTable.getSchema().toRowType()" return?
> what is the line *".map(a -> a)" *do and can you remove it?
> if I am understanding correctly, you are also using "time1" as the
> rowtime, is that want your intension is to use it later as well?
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].
>
> Thanks,
> Rong
>
> --
> [1]
> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>
>
> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim  wrote:
>
>> Hello,
>>
>> Consider the following snippet:
>>
>>> Table sourceTable = getKafkaSource0(tEnv);
>>> DataStream stream = tEnv.toAppendStream(sourceTable, Row.class)
>>>
>>> *  .map(a -> a)  .returns(sourceTable.getSchema().toRowType());*
>>> stream.print();
>>>
>> where sourceTable.printSchema() shows:
>>
>>> root
>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>
>>
>>
>>  This program returns the following exception:
>>
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>> at
>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>> at app.metatron.test.Main2.main(Main2.java:231)
>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>> cast to java.lang.Long*
>>> * at
>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>> ...
>>
>>
>> The row serializer seems to try to deep-copy an instance of
>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>> Could anybody help me?
>>
>> Best,
>>
>> - Dongwon
>>
>> p.s. though removing .returns() makes everything okay, I need to do that
>> as I want to convert DataStream into another table later.
>> p.s. the source table is created as follows:
>>
>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>>   .version("universal")
>>>   .topic("mytopic")
>>>   .property("bootstrap.servers", "localhost:9092")
>>>   .property("group.id", "mygroup")
>>>   .startFromEarliest();
>>> FormatDescriptor formatDescriptor = new Csv()
>>>   .deriveSchema()
>>>   .ignoreParseErrors()
>>>   .fieldDelimiter(',');
>>> Schema schemaDescriptor = new Schema()
>>>   .field("time1", SQL_TIMESTAMP())
>>>   .rowtime(
>>> new Rowtime()
>>>   .timestampsFromField("rowTime")
>>>   .watermarksPeriodicBounded(100)
>>>   );
>>> tEnv.connect(connectorDescriptor)
>>>   .withFormat(formatDescriptor)
>>>   .withSchema(schemaDescriptor)
>>>   .inAppendMode()

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-18 Thread Rong Rong
Hi Dongwon,

Can you provide a bit more information:
which Flink version are you using?
what is the "sourceTable.getSchema().toRowType()" return?
what is the line *".map(a -> a)" *do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime,
is that want your intension is to use it later as well?

As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
adds a type information hint about the return type of this operator. It is
used in cases where Flink cannot determine automatically[1].

Thanks,
Rong

--
[1]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim  wrote:

> Hello,
>
> Consider the following snippet:
>
>> Table sourceTable = getKafkaSource0(tEnv);
>> DataStream stream = tEnv.toAppendStream(sourceTable, Row.class)
>>
>> *  .map(a -> a)  .returns(sourceTable.getSchema().toRowType());*
>> stream.print();
>>
> where sourceTable.printSchema() shows:
>
>> root
>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>
>
>
>  This program returns the following exception:
>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>> at app.metatron.test.Main2.main(Main2.java:231)
>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>> cast to java.lang.Long*
>> * at
>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>> ...
>
>
> The row serializer seems to try to deep-copy an instance of
> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
> Could anybody help me?
>
> Best,
>
> - Dongwon
>
> p.s. though removing .returns() makes everything okay, I need to do that
> as I want to convert DataStream into another table later.
> p.s. the source table is created as follows:
>
> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>   .version("universal")
>>   .topic("mytopic")
>>   .property("bootstrap.servers", "localhost:9092")
>>   .property("group.id", "mygroup")
>>   .startFromEarliest();
>> FormatDescriptor formatDescriptor = new Csv()
>>   .deriveSchema()
>>   .ignoreParseErrors()
>>   .fieldDelimiter(',');
>> Schema schemaDescriptor = new Schema()
>>   .field("time1", SQL_TIMESTAMP())
>>   .rowtime(
>> new Rowtime()
>>   .timestampsFromField("rowTime")
>>   .watermarksPeriodicBounded(100)
>>   );
>> tEnv.connect(connectorDescriptor)
>>   .withFormat(formatDescriptor)
>>   .withSchema(schemaDescriptor)
>>   .inAppendMode()
>>   .registerTableSource("mysrc");
>> return tEnv.scan("mysrc");
>>   }
>
>


[Table API] ClassCastException when converting a table to DataStream

2019-07-17 Thread Dongwon Kim
Hello,

Consider the following snippet:

> Table sourceTable = getKafkaSource0(tEnv);
> DataStream stream = tEnv.toAppendStream(sourceTable, Row.class)
>
> *  .map(a -> a)  .returns(sourceTable.getSchema().toRowType());*
> stream.print();
>
where sourceTable.printSchema() shows:

> root
>  |-- time1: TimeIndicatorTypeInfo(rowtime)



 This program returns the following exception:

> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
> at app.metatron.test.Main2.main(Main2.java:231)
> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
> cast to java.lang.Long*
> * at
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> ...


The row serializer seems to try to deep-copy an instance of
java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as
I want to convert DataStream into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
> ConnectorDescriptor connectorDescriptor = new Kafka()
>   .version("universal")
>   .topic("mytopic")
>   .property("bootstrap.servers", "localhost:9092")
>   .property("group.id", "mygroup")
>   .startFromEarliest();
> FormatDescriptor formatDescriptor = new Csv()
>   .deriveSchema()
>   .ignoreParseErrors()
>   .fieldDelimiter(',');
> Schema schemaDescriptor = new Schema()
>   .field("time1", SQL_TIMESTAMP())
>   .rowtime(
> new Rowtime()
>   .timestampsFromField("rowTime")
>   .watermarksPeriodicBounded(100)
>   );
> tEnv.connect(connectorDescriptor)
>   .withFormat(formatDescriptor)
>   .withSchema(schemaDescriptor)
>   .inAppendMode()
>   .registerTableSource("mysrc");
> return tEnv.scan("mysrc");
>   }