Re: Flink Table API and Date fields

2019-07-08 Thread Flavio Pompermaier
I think I could do it for this specific use case but isn't this a big
limitation of Table API?
I think that java.util.Date should be a first class citizen in Flink..

Best,
Flavio

On Mon, Jul 8, 2019 at 4:06 AM JingsongLee  wrote:

> Hi Flavio:
> Looks like you use java.util.Date in your pojo, Now Flink table not
> support BasicTypeInfo.DATE_TYPE_INFO
> because of the limitations of some judgments in the code.
> Can you use java.sql.Date?
>
> Best, JingsongLee
>
> --
> From:Flavio Pompermaier 
> Send Time:2019年7月5日(星期五) 22:52
> To:user 
> Subject:Flink Table API and Date fields
>
> Hi to all,
> in my use case I have a stream of POJOs with Date fields.
> When I use Table API I get the following error:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. Type is not supported: Date
> at
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
> at
> org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
> at
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
> Caused by: org.apache.flink.table.api.TableException: Type is not
> supported: Date
> at
> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)
>
>
> Is there a way to deal with this without converting the Date field to a
> Long one?
>
> Best,
> Flavio
>
>


Re:Cannot catch exception throws by kafka consumer with JSONKeyValueDeserializationSchema

2019-07-08 Thread Haibo Sun
Hi,   Zhechao 


Usually, if you can, share your full exception stack and where you are trying 
to capture exceptions in your code (preferably with posting your relevant code 
directly
). That will help us understand and locate the issue you encounter.


Best,
Haibo

At 2019-07-08 14:11:22, "Zhechao Ma"  wrote:

Hello,


I'm using flinkKafkaConsumer to read message from a kafka topic with 
JSONKeyValueDeserializationSchema. When the message is json formatted, 
everything works fine, but it throws NullPointerException when processing a 
message is not json formatted. I try to catch the exception but cannot do that.


Can anyone give out some tips?


flink: 1.5
flink-kafka: 1.5
kafka-clients: 0.10.1.2_2.11

flink-json:


--

Thanks
Zhechao Ma


Re: Watermarks and Kafka

2019-07-08 Thread Juan Gentile
Hello Konstantin,

Thank you for you answer, I’ll clarify a bit our problem as actually we have a 
clear understanding of our problem now 😊.
We have 2 Kafka topics from 2 different datacenters (each with its own 
watermarks – We have a watermark message injected in each of them).
We replicate these into a single Kafka topic from which our Flink job consumes. 
In the Flink job, we consume per partition but the watermarks in a partition 
may come from 2 different datacenters, so the watermark could differ. That’s 
why we need to keep a Map in memory for each partition and DC and then emit the 
minimum of both.
Our current workaround to the problem is to implement something similar to what 
you mentioned, we have an operator that assigns the watermarks and has a Map 
which is stored in the state. But since there is no guarantee that all of these 
operators will receive messages from all partitions we need to remove the 
partition from the Map, and just use DC’s.  Then we use the union 
redistribution and always get the minimum of all DC’s for all (when it 
restores). This seems to work if we keep the same parallelism for the source 
and the watermark assigner, keeping them as close as possible in the flow and 
use operator chaining. But we understand that if we were to split them or have 
different parallelism then the watermark assigner would stop working properly 
because the partition wouldn’t be in the map. So that’s why we were looking for 
a solution that has already the watermarks handled in the source operator.
Please let us know your opinion.

Thank you,
Juan G.

From: Konstantin Knauf 
Date: Sunday, July 7, 2019 at 10:14 PM
To: Juan Gentile 
Cc: "user@flink.apache.org" , Olivier Solliec 
, Oleksandr Nitavskyi 
Subject: Re: Watermarks and Kafka

Hi Juan,

I just replied to your other question, but I think, I better get where you are 
coming from now.

Are you aware of per-partition watermarking [1]? You don't need to manage this 
map yourself. BUT: this does not solve the problem, that this Map is not stored 
in Managed State. Watermarks are generally not part of Flink's State. It seems 
like this is what you are looking for?

To also answer your questions: You could go for List> state with union redistribution. In this case every operator will 
get all entries during recovery and you can filter out the ones, which are 
relevant to the current operator by checking which partitions it is subscribed 
to after recovery.

Hope this helps.

Cheers,

Konstantin


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

On Wed, Jul 3, 2019 at 6:04 PM Juan Gentile 
mailto:j.gent...@criteo.com>> wrote:
Hello!

We currently have a job which reads from Kafka and uses punctuated watermarks 
based on the messages we read. We currently keep track of the watermarks for 
each partition to emit a consensus watermark, taking the smallest of all 
partitions.
We ran into an issue because we are not storing the state of this map of 
partitions->watermarks when one of the partitions got delayed and the job 
restarted, losing track of that partition and emitting a watermark anyway.
Our idea of a solution involves saving this map of partition -> watermarks into 
the state but we would like to know how Flink behaves when we decrease the 
parallelism so as to make sure that the instance that will read from Kafka also 
will have the state for that particular partition.

To give an example:

Operator 1: (Reads Partition1)
Partition 1: Watermark1 (Map / State)

Operator 2: (Reads Partition2)
Partition 2: Watermark2 (Map / State)

Operator 3: (Reads Partition1)
Partition 3: Watermark3 (Map / State)


After shrinking:

Operator 1: (Reads Partition1)
Partition 1: Watermark1 (Map / State)

Operator 2: (Reads Partition2, Partition3)
Partition 2: Watermark2 (Map / State)
Partition 3: Watermark3 (Map / State)

Or

Operator 1: (Reads Partition1, Partition3) => HERE we would have a problem as 
the state could be loaded on the other operator.
Partition 1: Watermark1 (Map / State)

Operator 2: (Reads Partition2)
Partition 2: Watermark2 (Map / State)
Partition 3: Watermark3 (Map / State)

For this we are using the operator state 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state

Re: Flink Table API and Date fields

2019-07-08 Thread JingsongLee
Flink 1.9 blink runner will support it as Generic Type,
But I don't recommend it. After all, there are java.sql.Date and java.time.* in 
Java.

Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年7月8日(星期一) 15:40
To:JingsongLee 
Cc:user 
Subject:Re: Flink Table API and Date fields

I think I could do it for this specific use case but isn't this a big 
limitation of Table API?
I think that java.util.Date should be a first class citizen in Flink..

Best,
Flavio
On Mon, Jul 8, 2019 at 4:06 AM JingsongLee  wrote:

Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support 
BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in 
the code.
 Can you use java.sql.Date? 

Best, JingsongLee

--
From:Flavio Pompermaier 
Send Time:2019年7月5日(星期五) 22:52
To:user 
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. Type is not supported: Date
 at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
 at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
 at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: 
Date
 at 
org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long 
one?

Best,
Flavio



Re: Flink Table API and Date fields

2019-07-08 Thread Flavio Pompermaier
Of course there are java.sql.* and java.time.* in Java but it's also true
that most of the times the POJOs you read come from an external (Maven) lib
and if such POJOs contain date fields you have to create a local version of
that POJO having the java.util.Date fields replaced by a java.sql.Date
version of them.
Moreover you also have to create a conversion function from the original
POJO to the Flink-specific one source (and this is very annoying expecially
because if the POJO gets modified you have to check that your conversion
function is updated accordingly).

Summarising, it is possible to work around this limitation but it's very
uncomfortable (IMHO)

On Mon, Jul 8, 2019 at 11:52 AM JingsongLee  wrote:

> Flink 1.9 blink runner will support it as Generic Type,
> But I don't recommend it. After all, there are java.sql.Date and
> java.time.* in Java.
>
> Best, JingsongLee
>
> --
> From:Flavio Pompermaier 
> Send Time:2019年7月8日(星期一) 15:40
> To:JingsongLee 
> Cc:user 
> Subject:Re: Flink Table API and Date fields
>
> I think I could do it for this specific use case but isn't this a big
> limitation of Table API?
> I think that java.util.Date should be a first class citizen in Flink..
>
> Best,
> Flavio
>
> On Mon, Jul 8, 2019 at 4:06 AM JingsongLee 
> wrote:
> Hi Flavio:
> Looks like you use java.util.Date in your pojo, Now Flink table not
> support BasicTypeInfo.DATE_TYPE_INFO
> because of the limitations of some judgments in the code.
> Can you use java.sql.Date?
>
> Best, JingsongLee
>
> --
> From:Flavio Pompermaier 
> Send Time:2019年7月5日(星期五) 22:52
> To:user 
> Subject:Flink Table API and Date fields
>
> Hi to all,
> in my use case I have a stream of POJOs with Date fields.
> When I use Table API I get the following error:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. Type is not supported: Date
> at
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
> at
> org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
> at
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
> Caused by: org.apache.flink.table.api.TableException: Type is not
> supported: Date
> at
> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)
>
>
> Is there a way to deal with this without converting the Date field to a
> Long one?
>
> Best,
> Flavio
>
>


Table API and ProcessWindowFunction

2019-07-08 Thread Flavio Pompermaier
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the
Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream events = env.addSource(src);
events.filter(e -> e.getCode() != null)
.keyBy(event -> Integer.valueOf(event.getCode()))
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new ProcessWindowFunction()  {.});

Best,
Flavio


Is the provided Serializer/TypeInformation checked "too late"?

2019-07-08 Thread Niels Basjes
Hi,

Context:
I'm looking into making the Google (BigQuery compatible) HyperLogLog++
implementation available in Flink because it is simply an Apache
licensed opensource library
- https://issuetracker.google.com/issues/123269269
- https://issues.apache.org/jira/browse/BEAM-7013
- https://github.com/google/zetasketch

While doing this I noticed that even though I provided an explicit
Kryo Serializer for the core class

i.e. I did 
senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class,
HLLSerializer.class);

I still see messages like this when registering a new
UserDefinedFunction (AggregateFunction / ScalarFunction) that has this
class as either input of output:

13:59:57,316 [INFO ] TypeExtractor   : 1815:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
getter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor   : 1818:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
setter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor   : 1857:
Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used
as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance.

So it is complaining about the serialization performance when done in
a different way than was configured.

Then I noticed that I see similar messages in other situations too.

In this code
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165

I see
13:59:58,478 [INFO ] TypeExtractor   : 1815:
class org.apache.flink.types.Row does not contain a getter for field
fields
13:59:58,478 [INFO ] TypeExtractor   : 1818:
class org.apache.flink.types.Row does not contain a setter for field
fields
13:59:58,479 [INFO ] TypeExtractor   : 1857:
Class class org.apache.flink.types.Row cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

even though a full TypeInformation instance for that type was provided

TypeInformation tupleType = new RowTypeInfo(SQL_TIMESTAMP,
STRING, STRING, STRING, STRING, LONG);
DataStream resultSet = tableEnv.toAppendStream(resultTable, tupleType);

I checked with my debugger and the code IS using for both mentioned
examples the correct serialization classes when running.

So what is happening here?
Did I forget to do a required call?
So is this a bug?
Is the provided serialization via TypeInformation 'skipped' during
startup and only used during runtime?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Is the provided Serializer/TypeInformation checked "too late"?

2019-07-08 Thread Timo Walther

Hi Niels,

the type handling evolved during the years and is a bit messed up 
through the different layers. You are almost right with your last 
assumption "Is the provided serialization via TypeInformation 'skipped' 
during startup and only used during runtime?". The type extraction 
returns a Kryo type and the Kryo type is using the configured default 
serializers during runtime. Therefore, the log entry is just an INFO but 
not a WARNING. And you did everything correct.


Btw there is also the possiblity to insert a custom type into the type 
extration by using Type Factories [0].


Maybe as a side comment: We are aware of these confusions and the Table 
& SQL API will hopefully not use the TypeExtractor anymore in 1.10. This 
is what I am working on at the moment.


Regards,
Timo

[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory


Am 08.07.19 um 14:17 schrieb Niels Basjes:

Hi,

Context:
I'm looking into making the Google (BigQuery compatible) HyperLogLog++
implementation available in Flink because it is simply an Apache
licensed opensource library
- https://issuetracker.google.com/issues/123269269
- https://issues.apache.org/jira/browse/BEAM-7013
- https://github.com/google/zetasketch

While doing this I noticed that even though I provided an explicit
Kryo Serializer for the core class

i.e. I did 
senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class,
HLLSerializer.class);

I still see messages like this when registering a new
UserDefinedFunction (AggregateFunction / ScalarFunction) that has this
class as either input of output:

13:59:57,316 [INFO ] TypeExtractor   : 1815:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
getter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor   : 1818:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
setter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor   : 1857:
Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used
as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance.

So it is complaining about the serialization performance when done in
a different way than was configured.

Then I noticed that I see similar messages in other situations too.

In this code
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165

I see
13:59:58,478 [INFO ] TypeExtractor   : 1815:
class org.apache.flink.types.Row does not contain a getter for field
fields
13:59:58,478 [INFO ] TypeExtractor   : 1818:
class org.apache.flink.types.Row does not contain a setter for field
fields
13:59:58,479 [INFO ] TypeExtractor   : 1857:
Class class org.apache.flink.types.Row cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

even though a full TypeInformation instance for that type was provided

TypeInformation tupleType = new RowTypeInfo(SQL_TIMESTAMP,
STRING, STRING, STRING, STRING, LONG);
DataStream resultSet = tableEnv.toAppendStream(resultTable, tupleType);

I checked with my debugger and the code IS using for both mentioned
examples the correct serialization classes when running.

So what is happening here?
Did I forget to do a required call?
So is this a bug?
Is the provided serialization via TypeInformation 'skipped' during
startup and only used during runtime?





Re: Flink Table API and Date fields

2019-07-08 Thread Timo Walther

Hi Flavio,

yes I agree. This check is a bit confusing. The initial reason for that 
was that sql.Time, sql.Date, and sql.Timestamp extend from util.Date as 
well. But handling it as a generic type as Jingson mentioned might be 
the better option in order to write custom UDFs to handle them.


Regards,
Timo

Am 08.07.19 um 12:04 schrieb Flavio Pompermaier:
Of course there are java.sql.* and java.time.* in Java but it's also 
true that most of the times the POJOs you read come from an external 
(Maven) lib and if such POJOs contain date fields you have to create a 
local version of that POJO having the java.util.Date fields replaced 
by a java.sql.Date version of them.
Moreover you also have to create a conversion function from the 
original POJO to the Flink-specific one source (and this is very 
annoying expecially because if the POJO gets modified you have to 
check that your conversion function is updated accordingly).


Summarising, it is possible to work around this limitation but it's 
very uncomfortable (IMHO)


On Mon, Jul 8, 2019 at 11:52 AM JingsongLee > wrote:


Flink 1.9 blink runner will support it as Generic Type,
But I don't recommend it. After all, there are java.sql.Date and
java.time.* in Java.

Best, JingsongLee

--
From:Flavio Pompermaier mailto:pomperma...@okkam.it>>
Send Time:2019年7月8日(星期一) 15:40
To:JingsongLee mailto:lzljs3620...@aliyun.com>>
Cc:user mailto:user@flink.apache.org>>
Subject:Re: Flink Table API and Date fields

I think I could do it for this specific use case but isn't
this a big limitation of Table API?
I think that java.util.Date should be a first class citizen in
Flink..

Best,
Flavio

On Mon, Jul 8, 2019 at 4:06 AM JingsongLee
mailto:lzljs3620...@aliyun.com>> wrote:
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink
table not support BasicTypeInfo.DATE_TYPE_INFO
because of the limitations of some judgments in the code.
Can you use java.sql.Date?

Best, JingsongLee

--
From:Flavio Pompermaier mailto:pomperma...@okkam.it>>
Send Time:2019年7月5日(星期五) 22:52
To:user mailto:user@flink.apache.org>>
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation
failed. Type is not supported: Date
at

org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at

org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at

org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at

org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is
not supported: Date
at

org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date
field to a Long one?

Best,
Flavio







Re: How are kafka consumer offsets handled if sink fails?

2019-07-08 Thread John Smith
So when we say a sink is at least once. It's because internally it's not
checking any kind of state and it sends what it has regardless, correct?
Cause I willl build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, 
wrote:

> Hi John,
>
> in case of a failure (e.g. in the SQL Sink) the Flink Job will be
> restarted from the last checkpoint. This means the offset of all Kafka
> partitions will be reset to that point in the stream along with state of
> all operators. To enable checkpointing you need to call
> StreamExecutionEnvironment#enableCheckpointing(). If you using the
> JDBCSinkFunction (which is an at-least-once sink), the output will be
> duplicated in the case of failures.
>
> To answer your questions:
>
> * For this the FlinkKafkaConsumer handles the offsets manually (no
> auto-commit).
> * No, the Flink Kafka Consumer does only commit offsets back to Kafka on a
> best-effort basis after every checkpoint. Internally Flink "commits" the
> checkpoints as part of its periodic checkpoints.
> * Yes, along with all other events between the last checkpoint and the
> failure.
> * It will continue from the last checkpoint.
>
> Hope this helps.
>
> Cheers,
>
> Konstantin
>
> On Fri, Jul 5, 2019 at 8:37 PM John Smith  wrote:
>
>> Hi using Apache Flink 1.8.0
>>
>> I'm consuming events from Kafka using nothing fancy...
>>
>> Properties props = new Properties();
>> props.setProperty("bootstrap.servers", kafkaAddress);
>> props.setProperty("group.id",kafkaGroup);
>>
>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic, new 
>> SimpleStringSchema(),props);
>>
>>
>> Do some JSON transforms and then push to my SQL database using JDBC and
>> stored procedure. Let's assume the SQL sink fails.
>>
>> We know that Kafka can either periodically commit offsets or it can be
>> done manually based on consumers logic.
>>
>> - How is the source Kafka consumer offsets handled?
>> - Does the Flink Kafka consumer commit the offset to per event/record?
>> - Will that single event that failed be retried?
>> - So if we had 5 incoming events and say on the 3rd one it failed, will
>> it continue on the 3rd or will the job restart and try those 5 events.
>>
>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: Table API and ProcessWindowFunction

2019-07-08 Thread Hequn Cheng
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario
would be quite helpful. For example, do you want to emit multi records
through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window
flatAggregate and window flatAggregate) and will be included in the near
coming release-1.9. The flatAggregate can emit multi records for a single
group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions

On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
wrote:

> Hi to all,
> from what I understood a ProcessWindowFunction can only be used in the
> Streaming API.
> Is there any plan to port them also in the Table API (in the near future)?
> I'd like to do with Table API the equivalent of:
>
> final DataStream events = env.addSource(src);
> events.filter(e -> e.getCode() != null)
> .keyBy(event -> Integer.valueOf(event.getCode()))
> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
> .process(new ProcessWindowFunction TimeWindow>()  {.});
>
> Best,
> Flavio
>


Re: [VOTE] How to Deal with Split/Select in DataStream API

2019-07-08 Thread Aljoscha Krettek
I think this would benefit from a FLIP, that neatly sums up the options, and 
which then gives us also a point where we can vote and ratify a decision.

As a gut feeling, I most like Option 3). Initially I would have preferred 
option 1) (because of a sense of API purity), but by now I think it’s good that 
users have this simpler option.

Aljoscha 

> On 8. Jul 2019, at 06:39, Xingcan Cui  wrote:
> 
> Hi all,
> 
> Thanks for your participation.
> 
> In this thread, we got one +1 for option 1 and option 3, respectively. In the 
> original thread[1], we got two +1 for option 1, one +1 for option 2, and five 
> +1 and one -1 for option 3.
> 
> To summarize,
> 
> Option 1 (port side output to flatMap and deprecate split/select): three +1
> Option 2 (introduce a new split/select and deprecate existing one): one +1
> Option 3 ("correct" the existing split/select): six +1 and one -1
> 
> It seems that most people involved are in favor of "correcting" the existing 
> split/select. However, this will definitely break the API compatibility, in a 
> subtle way.
> 
> IMO, the real behavior of consecutive split/select's has never been 
> thoroughly clarified. Even in the community, it hard to say that we come into 
> a consensus on its real semantics[2-4]. Though the initial design is not 
> ambiguous, there's no doubt that its concept has drifted. 
> 
> As the split/select is quite an ancient API, I cc'ed this to more members. It 
> couldn't be better if you can share your opinions on this.
> 
> Thanks,
> Xingcan
> 
> [1] 
> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
>  
> 
> [2] https://issues.apache.org/jira/browse/FLINK-1772 
> 
> [3] https://issues.apache.org/jira/browse/FLINK-5031 
> 
> [4] https://issues.apache.org/jira/browse/FLINK-11084 
> 
> 
> 
>> On Jul 5, 2019, at 12:04 AM, 杨力 > > wrote:
>> 
>> I prefer the 1) approach. I used to carry fields, which is needed only for 
>> splitting, in the outputs of flatMap functions. Replacing it with outputTags 
>> would simplify data structures.
>> 
>> Xingcan Cui mailto:xingc...@gmail.com> 
>> >> 于 2019年7月5日周五 
>> 上午2:20写道:
>> Hi folks,
>> 
>> Two weeks ago, I started a thread [1] discussing whether we should discard 
>> the split/select methods (which have been marked as deprecation since v1.7) 
>> in DataStream API. 
>> 
>> The fact is, these methods will cause "unexpected" results when using 
>> consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or 
>> multi-times on the same target (e.g., ds.split(a).select(b), 
>> ds.split(c).select(d)). The reason is that following the initial design, the 
>> new split/select logic will always override the existing one on the same 
>> target operator, rather than append to it. Some users may not be aware of 
>> that, but if you do, a current solution would be to use the more powerful 
>> side output feature [2].
>> 
>> FLINK-11084  added some 
>> restrictions to the existing split/select logic and suggest to replace it 
>> with side output in the future. However, considering that the side output is 
>> currently only available in the process function layer and the split/select 
>> could have been widely used in many real-world applications, we'd like to 
>> start a vote andlisten to the community on how to deal with them.
>> 
>> In the discussion thread [1], we proposed three solutions as follows. All of 
>> them are feasible but have different impacts on the public API.
>> 
>> 1) Port the side output feature to DataStream API's flatMap and replace 
>> split/select with it.
>> 
>> 2) Introduce a dedicated function in DataStream API (with the "correct" 
>> behavior but a different name) that can be used to replace the existing 
>> split/select.
>> 
>> 3) Keep split/select but change the behavior/semantic to be "correct".
>> 
>> Note that this is just a vote for gathering information, so feel free to 
>> participate and share your opinions.
>> 
>> The voting time will end on July 7th 17:00 EDT.
>> 
>> Thanks,
>> Xingcan
>> 
>> [1] 
>> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
>>  
>> >  
>> >
>> [2] 
>> h

Hive in sql-client

2019-07-08 Thread Yebgenya Lazarkhosrouabadi
Hello,

I'm trying to use Hive tables in sql-client. How can I do this?
I have downloaded  Blink from Github to be able to use catalogs in the YAML 
file, but I can't run its sql-client using ./sql-client.sh embedded .

Can you please help me?

Regards
Bernadette Lazar

HINWEIS: Dies ist eine vertrauliche Nachricht und nur f?r den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zug?nglich zu machen. Sollten Sie diese Nachricht irrt?mlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.


Re: Table API and ProcessWindowFunction

2019-07-08 Thread Flavio Pompermaier
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains
a UserId field and, every X minutes (i.e. using a Time Window) and for each
different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the
.keyBy().window() as shown in the  previous mail example to contact those 3
services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's
full potential so I'd like to enrich my object using LATERAL TABLEs or
ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass
the thumbing window start/end to the LATERAL JOIN table functions (because
this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API
aims to solve..

* Of course this could kill the REST endpoint if the number of users is
very big ..because of this I'd like to keep the external state of source
tables as an internal Flink state and then do a JOIN on the UserId. The
problem here is that I need to "materialize" them using Debezium (or
similar) via Kafka and dynamic tables..is there any example of keeping
multiple tables synched with Flink state through Debezium (without the need
of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng  wrote:

> Hi Flavio,
>
> Nice to hear your ideas on Table API!
>
> Could you be more specific about your requirements? A detailed scenario
> would be quite helpful. For example, do you want to emit multi records
> through the collector or do you want to use the timer?
>
> BTW, Table API introduces flatAggregate recently(both non-window
> flatAggregate and window flatAggregate) and will be included in the near
> coming release-1.9. The flatAggregate can emit multi records for a single
> group. More details here[1][2].
> Hope this can solve your problem.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>
> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> from what I understood a ProcessWindowFunction can only be used in the
>> Streaming API.
>> Is there any plan to port them also in the Table API (in the near future)?
>> I'd like to do with Table API the equivalent of:
>>
>> final DataStream events = env.addSource(src);
>> events.filter(e -> e.getCode() != null)
>> .keyBy(event -> Integer.valueOf(event.getCode()))
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>> .process(new ProcessWindowFunction> TimeWindow>()  {.});
>>
>> Best,
>> Flavio
>>
>


Add Logical filter on a query plan from Flink Table API

2019-07-08 Thread Felipe Gutierrez
Hi,

I am a newbie in Apache Calcite. I am trying to use it with Apache Flink.
To start I am trying to create a HelloWorld which just add a logical filter
on my query.
1 - I have my Flink app using Table API [1].
2 - I have created my Calcite filter rule which is applied to my FLink
query if I use CalciteConfig cc = new
CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
[2];
3 - The debug thread only goes to my rule if there is a filter on my query.

I would like to create a logical filter if there is no filter set on the
logical query. How should I implement it?
I see my LogicalFilter been created when I call "tableEnv.explain()"
method. I suppose that I can add some logical filters on the plan.

== Abstract Syntax Tree ==
LogicalFilter(condition=[>=($6, 50)])
  LogicalTableScan(table=[[TicketsStation01Plat01]])

== Optimized Logical Plan ==
DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
  StreamTableSourceScan(table=[[TicketsStation01Plat01]], fields=[sensorId,
sensorType, platformId, platformType, stationId, timestamp, value, trip,
eventTime], source=[SensorTuples])

== Physical Execution Plan ==


Thanks,
Felipe

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: How does Flink recovers uncommited Kafka offset in AsyncIO?

2019-07-08 Thread Fabian Hueske
Hi,

Kafka offsets are only managed by the Flink Kafka Consumer. All following
operators do not care whether the events were read from Kafka, files,
Kinesis or whichever source.
It is the responsibility of the source to include its reading position (in
case of Kafka the partition offsets) in a checkpoint.
The AsyncIO operator will checkpoint all events for which requests are
currently in flight, i.e., it checkpoints it's working set that needs to be
recovered (with requests being sent out again) after a failure.
Flink's checkpointing mechanism which is based on checkpoint barriers
ensures that all operator checkpoints (for example Kafka offsets and
AsyncIO requests) are consistent, i.e., every record that was read before
the checkpoint and which was stuck in the AsyncIO operator is in the
AsyncIO state. Every record that was read after the checkpoint (and before
the failure) is read again and not in the AsyncIO state.

Btw. if you don't require ordered output of the AsyncIO operator, I'd
switch to unordered wait. Otherwise, a single blocking call might block
subsequent events to be emitted because their are not allowed to overtake
the blocking event.

Best, Fabian


Am Mo., 1. Juli 2019 um 21:41 Uhr schrieb wang xuchen :

> Hi Flink experts,
>
> I am prototyping a real time system that reads from Kafka source with
> Flink and calls out to an external system as part of the event processing.
> One of the most important requirements are read from Kafka should NEVER
> stall, even in face of some async external calls slowness while holding
> certain some kafka offsets. At least once processing is good enough.
>
> Currently, I am using AsyncIO with a thread pool of size 20. My
> understanding is if I use orderedwait with a large 'capacity', consumption
> from Kafka should continue even if some external calls experience slowness
> (holding the offsets) as long as the capacity is not exhausted.
>
> (From my own reading of Flink source code, the capacity of the orderedwait
> function translate to the size of the OrderedStreamElementQueue size.)
>
> However, I expect that while the external calls stuck, stream source
> should keep pumping out from Kafka as long as there is still capacity, but
> offset after the stuck record should NOT be committed back to Kafka and
> (the checkpoint should also stall to accomodate the stalled offests?)
>
> My observation is, if I set the capacity large enough (max_int / 100 for
> instance), the consumption was not stalled (which is good), but the offsets
> were all committed back to Kafka AFTER the stalled records and all
> checkpoint succeeded, no back pressure was incurred.
>
> In this case, if some machines crash, how does Flink recover the stalled
> offsets? Which checkpoint does Flink rollback to?  I understand that
> commiting offset back to Kafka is merely to show progress to external
> monitoring tool, but I hope Flink does book keeping somewhere to journal
> async call xyz is not return and should be retried during recovery.
>
> ==
>
> I`ve done a some more experiments, looks like Flink is able to recover the
> record which I threw completeExceptionly even if I use 'unorderedwait' on
> the async stream.
>
> Which leads to Fabian`s early comments, 'Flink does not rely on Kafka
> consumer offset to recover, committing offset to Kafka is merely to show
> progress to external monitoring tools'.
>
> I couldn`t pinpoint the code that Flink uses the achieve it, maybe
> in-flight async invokations in 'unorderedstreamelementqueue' are part of
> the checkpoint and Flink saves the actual payload for later replay?
>
> Can anyone cast some lights?
>


Re: Error checkpointing to S3 like FS (EMC ECS)

2019-07-08 Thread Fabian Hueske
Hi Vishwas,

Sorry for the late response.
Are you still facing the issue?

I have no experience with EMC ECS, but the exception suggests an issue with
the host name:

1378 Caused by: java.net.UnknownHostException:
aip-featuretoolkit.SU73ECSG1P1d.***.COM
   1379 at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
   1380 at java.net.InetAddress.getAllByName(InetAddress.java:1192)
   1381 at java.net.InetAddress.getAllByName(InetAddress.java:1126)
   1382 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.SystemDefaultDnsResolver.resolve(SystemDefaultDnsResolver.java:27)

Best, Fabian

Am Di., 25. Juni 2019 um 03:26 Uhr schrieb Vishwas Siravara <
vsirav...@gmail.com>:

> Hi,
> I am using flink version 1.7.2 , I am trying to use S3 like object
> storage EMC ECS(
> https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm). Not
> all S3 apis are supported by EMC ESC according to this document.  Here
> is my config
>
> s3.endpoint: SU73ECSG1P1d.***.COM
> s3.access-key: vdna_np_user
> security.ssl.rest.enabled: false
> web.timeout: 1
> s3.secret-key: J***
>
> I can access this bucket from s3cmd client.
>
> I set the state backend from my scala application
> env.setStateBackend(new
> FsStateBackend("s3://aip-featuretoolkit/checkpoints/"))
>
> However when I run my application I get this exception :
>
> ClientException: Unable to execute HTTP request:
> aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP
> request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
>1336 at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255)
>1337 at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
>1338 at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
>1339 at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>1340 at
>
> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1173)
>1341 at
>
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1153)
>1342 at
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
>1343 at
>
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>1344 ... 10 more
>1345 Caused by:
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException:
> doesBucketExist on aip-featuretoolkit:
> org.apache.flink.fs.s3base.shaded.co
> m.amazonaws.SdkClientException: Unable to execute HTTP request:
> aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP
> request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
>1346 at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
>1347 at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
>1348 at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>1349 at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>1350 at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>1351 at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>1352 at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:372)
>1353 at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:308)
>1354 at
>
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
>1355 at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
>1356 at
> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>1357 at
> org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>1358 at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
>1359 at
>
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
>1360 at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)
>1361 ... 17 more
>1362 Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
> Unable to execute HTTP request:
> aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
>1363 at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpCli

Unable to start task manager in debug mode

2019-07-08 Thread Vishwas Siravara
Hi guys,
I am not able to start a stand alone session with one task manager and one
job manager on the same node by adding debug option in flink-conf.yaml
as env.java.opts:
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005(
https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters)
.
This is what my master and slave files look like
*cat masters *
*localhost:8081*
*[was@sl73rspapd031 conf]$ cat slaves*
*localhost*

The job manager comes up but the task manager does not, from the log file

ERROR: transport error 202: bind failed: Address already in use
ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports
initialized [debugInit.c:750]


This is because the job manager binds 5005 to its process and the task
manager cannot do this since it is already associated with the

job manager, how can I start the task manager ? Should I comment out
the debug config from the yaml file after job manager is up and start

the task manger separately ? Thanks for your help.


Re:

2019-07-08 Thread Bowen Li
Hi Xuchen,

Every email in our ML asking questions **MUST** have a valid subject, to
facilitate archive search in the future and save people's time to decide
whether they can help answer your question or not by just glimpsing the
subject thru their email clients.

Though your question itself is well written, I don't think it's acceptable
to not have a well written subject. Note that this is brought up not
specific to you as a person, but specific to a common practice everyone in
the community should follow.

Bowen

On Sun, Jul 7, 2019 at 1:19 PM Konstantin Knauf 
wrote:

> Hi Wang,
>
> you guessed correctly, the events are not replayed from Kafka, but are
> part of the state of the AsyncWaitOperator and the request are resubmitted
> by the AsyncOperator in it's open() method.
>
> Cheers,
>
> Konstantin
>
>
>
> On Mon, Jul 1, 2019 at 9:39 PM wang xuchen  wrote:
>
>> Hi Flink experts,
>>
>> I am prototyping a real time system that reads from Kafka source with
>> Flink and calls out to an external system as part of the event processing.
>> One of the most important requirements are read from Kafka should NEVER
>> stall, even in face of some async external calls slowness while holding
>> certain some kafka offsets. At least once processing is good enough.
>>
>> Currently, I am using AsyncIO with a thread pool of size 20. My
>> understanding is if I use orderedwait with a large 'capacity', consumption
>> from Kafka should continue even if some external calls experience slowness
>> (holding the offsets) as long as the capacity is not exhausted.
>>
>> (From my own reading of Flink source code, the capacity of the
>> orderedwait function translate to the size of the OrderedStreamElementQueue
>> size.)
>>
>> However, I expect that while the external calls stuck, stream source
>> should keep pumping out from Kafka as long as there is still capacity, but
>> offset after the stuck record should NOT be committed back to Kafka and
>> (the checkpoint should also stall to accomodate the stalled offests?)
>>
>> My observation is, if I set the capacity large enough (max_int / 100 for
>> instance), the consumption was not stalled (which is good), but the offsets
>> were all committed back to Kafka AFTER the stalled records and all
>> checkpoint succeeded, no back pressure was incurred.
>>
>> In this case, if some machines crash, how does Flink recover the stalled
>> offsets? Which checkpoint does Flink rollback to?  I understand that
>> commiting offset back to Kafka is merely to show progress to external
>> monitoring tool, but I hope Flink does book keeping somewhere to journal
>> async call xyz is not return and should be retried during recovery.
>>
>> ==
>>
>> I`ve done a some more experiments, looks like Flink is able to recover
>> the record which I threw completeExceptionly even if I use 'unorderedwait'
>> on the async stream.
>>
>> Which leads to Fabian`s early comments, 'Flink does not rely on Kafka
>> consumer offset to recover, committing offset to Kafka is merely to show
>> progress to external monitoring tools'.
>>
>> I couldn`t pinpoint the code that Flink uses the achieve it, maybe
>> in-flight async invokations in 'unorderedstreamelementqueue' are part of
>> the checkpoint and Flink saves the actual payload for later replay?
>>
>> Can anyone cast some lights?
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: Hive in sql-client

2019-07-08 Thread Bowen Li
Hi Yebgenya,

To use Blink's integration with Hive in SQL CLI, you can reference Blink's
documentation at [1], [2], and [3]

Note that Hive integration is actually available in **Flink master branch**
now and will be released soon as part of Flink 1.9.0. The end-to-end
integration should be feature complete by this week or so. To use Flink's
HiveCatalog and run SQL queries on Hive data, please read and follow the
documentations at [4] and [5]. Early feedbacks are more than welcome!

[1] https://github.com/apache/flink/blob/blink/docs/dev/table/catalog.md
[2] https://github.com/apache/flink/blob/blink/docs/dev/table/sqlClient.md
[3]
https://github.com/apache/flink/blob/blink/docs/dev/table/hive_compatibility.md

[4]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#catalogs
[5] https://github.com/apache/flink/pull/8976/files  I expect this PR to be
merged very soon



On Mon, Jul 8, 2019 at 7:43 AM Yebgenya Lazarkhosrouabadi <
lazarkhosrouab...@integration-factory.de> wrote:

> Hello,
>
>
>
> I’m trying to use Hive tables in sql-client. How can I do this?
>
> I have downloaded  Blink from Github to be able to use catalogs in the
> YAML file, but I can’t run its sql-client using *./sql-client.sh embedded*
> .
>
>
>
> Can you please help me?
>
>
>
> Regards
>
> Bernadette Lazar
>
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>


Apache Flink - Relation between stream time characteristic and timer triggers

2019-07-08 Thread M Singh
Hi:
I have a few questions about the stream time characteristics:
1. If the time characteristic is set to TimeCharacteristic.EventTime, but the 
timers in a processor or trigger is set using registerProcessingTimeTimer (or 
vice versa), then will that timer fire ?  
2.  Once the time character is set on the stream environment, and changed later 
in the application, which one is applied, the first one or the last one ?
3.  If the stream time characteristic is set to IngestionTime, then is there 
any adverse effect of assigning the timestamp using  
assignTimeStampAndWatermark to a stream later in the application ?
Thanks

Re: Unable to start task manager in debug mode

2019-07-08 Thread Xintong Song
Hi Vishwas,

The value of `env.java.opts` will be passed as JVM options to both
jobmanager and taskmanager. Thus the same port is set for two processes.

If you need to pass JVM options to jobmanager and taskmanager differently,
you can use `env.java.opts.jobmanager` and `env.java.opts.taskmanager`.

Thank you~

Xintong Song



On Tue, Jul 9, 2019 at 4:41 AM Vishwas Siravara  wrote:

> Hi guys,
> I am not able to start a stand alone session with one task manager and one
> job manager on the same node by adding debug option in flink-conf.yaml as 
> env.java.opts:
> -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005(
> https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters)
> .
> This is what my master and slave files look like
> *cat masters *
> *localhost:8081*
> *[was@sl73rspapd031 conf]$ cat slaves*
> *localhost*
>
> The job manager comes up but the task manager does not, from the log file
>
> ERROR: transport error 202: bind failed: Address already in use
> ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
> JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports initialized 
> [debugInit.c:750]
>
>
> This is because the job manager binds 5005 to its process and the task 
> manager cannot do this since it is already associated with the
>
> job manager, how can I start the task manager ? Should I comment out the 
> debug config from the yaml file after job manager is up and start
>
> the task manger separately ? Thanks for your help.
>
>


Re: Table API and ProcessWindowFunction

2019-07-08 Thread Hequn Cheng
Hi Flavio,

Thanks for your information.

>From your description, it seems that you only use the window to get the
start and end time. There are no aggregations happen. If this is the case,
you can get the start and end time by yourself(the
`TimeWindow.getWindowStartWithOffset()` shows how to get window start
according to the timestamp). To be more specific, if you use processing
time, you can get your timestamp with System.currentTimeMillis(), and then
use it to get the window start and end
with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich
the information. You can add a cache in your table function to avoid
frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier 
wrote:

> Hi Hequn, thanks for your answer.
> What I'm trying to do is to read a stream of events that basically
> contains a UserId field and, every X minutes (i.e. using a Time Window) and
> for each different UserId key, query 3 different REST services to enrich my
> POJOs*.
> For the moment what I do is to use a ProcessWindowFunction after the
> .keyBy().window() as shown in the  previous mail example to contact those 3
> services and enrich my object.
>
> However I don't like this solution because I'd like to use Flink to it's
> full potential so I'd like to enrich my object using LATERAL TABLEs or
> ASYNC IO..
> The main problem I'm facing right now is that  I can't find a way to pass
> the thumbing window start/end to the LATERAL JOIN table functions (because
> this is a parameter of the REST query).
> Moreover I don't know whether this use case is something that Table API
> aims to solve..
>
> * Of course this could kill the REST endpoint if the number of users is
> very big ..because of this I'd like to keep the external state of source
> tables as an internal Flink state and then do a JOIN on the UserId. The
> problem here is that I need to "materialize" them using Debezium (or
> similar) via Kafka and dynamic tables..is there any example of keeping
> multiple tables synched with Flink state through Debezium (without the need
> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>
> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng  wrote:
>
>> Hi Flavio,
>>
>> Nice to hear your ideas on Table API!
>>
>> Could you be more specific about your requirements? A detailed scenario
>> would be quite helpful. For example, do you want to emit multi records
>> through the collector or do you want to use the timer?
>>
>> BTW, Table API introduces flatAggregate recently(both non-window
>> flatAggregate and window flatAggregate) and will be included in the near
>> coming release-1.9. The flatAggregate can emit multi records for a single
>> group. More details here[1][2].
>> Hope this can solve your problem.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>>
>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> from what I understood a ProcessWindowFunction can only be used in the
>>> Streaming API.
>>> Is there any plan to port them also in the Table API (in the near
>>> future)?
>>> I'd like to do with Table API the equivalent of:
>>>
>>> final DataStream events = env.addSource(src);
>>> events.filter(e -> e.getCode() != null)
>>> .keyBy(event -> Integer.valueOf(event.getCode()))
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>> .process(new ProcessWindowFunction>> Integer, TimeWindow>()  {.});
>>>
>>> Best,
>>> Flavio
>>>
>>
>
>


Re: Flink Table API and Date fields

2019-07-08 Thread Rong Rong
Hi Flavio,

Yes I think the handling of the DateTime in Flink can be better when
dealing with DATE TIME type of systems.
There are several limitations Jingsong briefly mentioned about
java.util.Date. Some of these limitations are even affecting correctness of
the results (e.g. Gregorian vs Julian calendar). and java.sql.Date is
broadly used currently in Flink.

I think handling it as a completely different type, either as generic type
or another extension of the basic type will definitely helpful here. One
important reason is that Flink can prevent the usage of some sql.Date
functions mistakenly applied on util.Date.

--
Rong

On Mon, Jul 8, 2019 at 6:13 AM Timo Walther  wrote:

> Hi Flavio,
>
> yes I agree. This check is a bit confusing. The initial reason for that
> was that sql.Time, sql.Date, and sql.Timestamp extend from util.Date as
> well. But handling it as a generic type as Jingson mentioned might be the
> better option in order to write custom UDFs to handle them.
>
> Regards,
> Timo
>
> Am 08.07.19 um 12:04 schrieb Flavio Pompermaier:
>
> Of course there are java.sql.* and java.time.* in Java but it's also true
> that most of the times the POJOs you read come from an external (Maven) lib
> and if such POJOs contain date fields you have to create a local version of
> that POJO having the java.util.Date fields replaced by a java.sql.Date
> version of them.
> Moreover you also have to create a conversion function from the original
> POJO to the Flink-specific one source (and this is very annoying expecially
> because if the POJO gets modified you have to check that your conversion
> function is updated accordingly).
>
> Summarising, it is possible to work around this limitation but it's very
> uncomfortable (IMHO)
>
> On Mon, Jul 8, 2019 at 11:52 AM JingsongLee 
> wrote:
>
>> Flink 1.9 blink runner will support it as Generic Type,
>> But I don't recommend it. After all, there are java.sql.Date and
>> java.time.* in Java.
>>
>> Best, JingsongLee
>>
>> --
>> From:Flavio Pompermaier 
>> Send Time:2019年7月8日(星期一) 15:40
>> To:JingsongLee 
>> Cc:user 
>> Subject:Re: Flink Table API and Date fields
>>
>> I think I could do it for this specific use case but isn't this a big
>> limitation of Table API?
>> I think that java.util.Date should be a first class citizen in Flink..
>>
>> Best,
>> Flavio
>>
>> On Mon, Jul 8, 2019 at 4:06 AM JingsongLee 
>> wrote:
>> Hi Flavio:
>> Looks like you use java.util.Date in your pojo, Now Flink table not
>> support BasicTypeInfo.DATE_TYPE_INFO
>> because of the limitations of some judgments in the code.
>> Can you use java.sql.Date?
>>
>> Best, JingsongLee
>>
>> --
>> From:Flavio Pompermaier 
>> Send Time:2019年7月5日(星期五) 22:52
>> To:user 
>> Subject:Flink Table API and Date fields
>>
>> Hi to all,
>> in my use case I have a stream of POJOs with Date fields.
>> When I use Table API I get the following error:
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. Type
>> is not supported: Date
>> at
>> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
>> at
>> org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
>> at
>> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
>> Caused by: org.apache.flink.table.api.TableException: Type is not
>> supported: Date
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)
>>
>>
>> Is there a way to deal with this without converting the Date field to a
>> Long one?
>>
>> Best,
>> Flavio
>>
>>
>
>
>


Re: How are kafka consumer offsets handled if sink fails?

2019-07-08 Thread Rong Rong
Hi John,

I think what Konstantin is trying to say is: Flink's Kafka consumer does
not start consuming from the Kafka commit offset when starting the
consumer, it would actually start with the offset that's last checkpointed
to external DFS. (e.g. the starting point of the consumer has no relevance
with Kafka committed offset whatsoever - if checkpoint is enabled.)

This is to quote:
"*the Flink Kafka Consumer does only commit offsets back to Kafka on a
best-effort basis after every checkpoint. Internally Flink "commits" the
[checkpoints]->[current Kafka offset] as part of its periodic checkpoints.*"

However if you do not enable checkpointing, I think your consumer will
by-default restart from the default kafka offset (which I think is your
committed group offset).

--
Rong


On Mon, Jul 8, 2019 at 6:39 AM John Smith  wrote:

> So when we say a sink is at least once. It's because internally it's not
> checking any kind of state and it sends what it has regardless, correct?
> Cause I willl build a sink that calls stored procedures.
>
> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <
> konstan...@ververica.com> wrote:
>
>> Hi John,
>>
>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be
>> restarted from the last checkpoint. This means the offset of all Kafka
>> partitions will be reset to that point in the stream along with state of
>> all operators. To enable checkpointing you need to call
>> StreamExecutionEnvironment#enableCheckpointing(). If you using the
>> JDBCSinkFunction (which is an at-least-once sink), the output will be
>> duplicated in the case of failures.
>>
>> To answer your questions:
>>
>> * For this the FlinkKafkaConsumer handles the offsets manually (no
>> auto-commit).
>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka on
>> a best-effort basis after every checkpoint. Internally Flink "commits" the
>> checkpoints as part of its periodic checkpoints.
>> * Yes, along with all other events between the last checkpoint and the
>> failure.
>> * It will continue from the last checkpoint.
>>
>> Hope this helps.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Fri, Jul 5, 2019 at 8:37 PM John Smith  wrote:
>>
>>> Hi using Apache Flink 1.8.0
>>>
>>> I'm consuming events from Kafka using nothing fancy...
>>>
>>> Properties props = new Properties();
>>> props.setProperty("bootstrap.servers", kafkaAddress);
>>> props.setProperty("group.id",kafkaGroup);
>>>
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic, new 
>>> SimpleStringSchema(),props);
>>>
>>>
>>> Do some JSON transforms and then push to my SQL database using JDBC and
>>> stored procedure. Let's assume the SQL sink fails.
>>>
>>> We know that Kafka can either periodically commit offsets or it can be
>>> done manually based on consumers logic.
>>>
>>> - How is the source Kafka consumer offsets handled?
>>> - Does the Flink Kafka consumer commit the offset to per event/record?
>>> - Will that single event that failed be retried?
>>> - So if we had 5 incoming events and say on the 3rd one it failed, will
>>> it continue on the 3rd or will the job restart and try those 5 events.
>>>
>>>
>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>>
>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>>
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>>
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>


Re: Add Logical filter on a query plan from Flink Table API

2019-07-08 Thread Hequn Cheng
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the
logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't
contain filter? Logically, this can be done through a rule. However, it
sounds a little hack and you have to pay attention to semantic problems.
One thing you have to notice is that you can't change the RowType when you
perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain
the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you.
You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()"
method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
rule. If you remove your filter rule, there is nothing change for the plan.

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I am a newbie in Apache Calcite. I am trying to use it with Apache Flink.
> To start I am trying to create a HelloWorld which just add a logical filter
> on my query.
> 1 - I have my Flink app using Table API [1].
> 2 - I have created my Calcite filter rule which is applied to my FLink
> query if I use CalciteConfig cc = new
> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
> [2];
> 3 - The debug thread only goes to my rule if there is a filter on my query.
>
> I would like to create a logical filter if there is no filter set on the
> logical query. How should I implement it?
> I see my LogicalFilter been created when I call "tableEnv.explain()"
> method. I suppose that I can add some logical filters on the plan.
>
> == Abstract Syntax Tree ==
> LogicalFilter(condition=[>=($6, 50)])
>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>
> == Optimized Logical Plan ==
> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
> fields=[sensorId, sensorType, platformId, platformType, stationId,
> timestamp, value, trip, eventTime], source=[SensorTuples])
>
> == Physical Execution Plan ==
> 
>
> Thanks,
> Felipe
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: [VOTE] How to Deal with Split/Select in DataStream API

2019-07-08 Thread Xingcan Cui
Hi Aljoscha,

Thanks for your response.

With all this preliminary information collected, I’ll start a formal process.

Thank everybody for your attention.

Best,
Xingcan

> On Jul 8, 2019, at 10:17 AM, Aljoscha Krettek  wrote:
> 
> I think this would benefit from a FLIP, that neatly sums up the options, and 
> which then gives us also a point where we can vote and ratify a decision.
> 
> As a gut feeling, I most like Option 3). Initially I would have preferred 
> option 1) (because of a sense of API purity), but by now I think it’s good 
> that users have this simpler option.
> 
> Aljoscha 
> 
>> On 8. Jul 2019, at 06:39, Xingcan Cui > > wrote:
>> 
>> Hi all,
>> 
>> Thanks for your participation.
>> 
>> In this thread, we got one +1 for option 1 and option 3, respectively. In 
>> the original thread[1], we got two +1 for option 1, one +1 for option 2, and 
>> five +1 and one -1 for option 3.
>> 
>> To summarize,
>> 
>> Option 1 (port side output to flatMap and deprecate split/select): three +1
>> Option 2 (introduce a new split/select and deprecate existing one): one +1
>> Option 3 ("correct" the existing split/select): six +1 and one -1
>> 
>> It seems that most people involved are in favor of "correcting" the existing 
>> split/select. However, this will definitely break the API compatibility, in 
>> a subtle way.
>> 
>> IMO, the real behavior of consecutive split/select's has never been 
>> thoroughly clarified. Even in the community, it hard to say that we come 
>> into a consensus on its real semantics[2-4]. Though the initial design is 
>> not ambiguous, there's no doubt that its concept has drifted. 
>> 
>> As the split/select is quite an ancient API, I cc'ed this to more members. 
>> It couldn't be better if you can share your opinions on this.
>> 
>> Thanks,
>> Xingcan
>> 
>> [1] 
>> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
>>  
>> 
>> [2] https://issues.apache.org/jira/browse/FLINK-1772 
>> 
>> [3] https://issues.apache.org/jira/browse/FLINK-5031 
>> 
>> [4] https://issues.apache.org/jira/browse/FLINK-11084 
>> 
>> 
>> 
>>> On Jul 5, 2019, at 12:04 AM, 杨力 >> > wrote:
>>> 
>>> I prefer the 1) approach. I used to carry fields, which is needed only for 
>>> splitting, in the outputs of flatMap functions. Replacing it with 
>>> outputTags would simplify data structures.
>>> 
>>> Xingcan Cui mailto:xingc...@gmail.com> 
>>> >> 于 2019年7月5日周五 
>>> 上午2:20写道:
>>> Hi folks,
>>> 
>>> Two weeks ago, I started a thread [1] discussing whether we should discard 
>>> the split/select methods (which have been marked as deprecation since v1.7) 
>>> in DataStream API. 
>>> 
>>> The fact is, these methods will cause "unexpected" results when using 
>>> consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or 
>>> multi-times on the same target (e.g., ds.split(a).select(b), 
>>> ds.split(c).select(d)). The reason is that following the initial design, 
>>> the new split/select logic will always override the existing one on the 
>>> same target operator, rather than append to it. Some users may not be aware 
>>> of that, but if you do, a current solution would be to use the more 
>>> powerful side output feature [2].
>>> 
>>> FLINK-11084 >> > added some 
>>> restrictions to the existing split/select logic and suggest to replace it 
>>> with side output in the future. However, considering that the side output 
>>> is currently only available in the process function layer and the 
>>> split/select could have been widely used in many real-world applications, 
>>> we'd like to start a vote andlisten to the community on how to deal with 
>>> them.
>>> 
>>> In the discussion thread [1], we proposed three solutions as follows. All 
>>> of them are feasible but have different impacts on the public API.
>>> 
>>> 1) Port the side output feature to DataStream API's flatMap and replace 
>>> split/select with it.
>>> 
>>> 2) Introduce a dedicated function in DataStream API (with the "correct" 
>>> behavior but a different name) that can be used to replace the existing 
>>> split/select.
>>> 
>>> 3) Keep split/select but change the behavior/semantic to be "correct".
>>> 
>>> Note that this is just a vote for gathering information, so feel free to 
>>> participate and share your opinions.
>>> 
>>> The voting time will end on July 7th 17:00 EDT.
>>> 
>>> Thanks,
>>> Xingcan
>>> 
>>> [1] 
>>> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d40

Re: Apache Flink - Relation between stream time characteristic and timer triggers

2019-07-08 Thread Yun Gao
Hi,
For the three questions,
  1. The processing time timer will be trigger. IMO you may think the 
processing time timer as in parallel with the event time timer. They are 
processed separately underlying. The processing time timer will be triggered 
according to the realistic time.
  2. I'am not very clear on how to changed later in the application. Do you 
mean call `StreamExecutionEnvironment#setStreamTimeCharacteristics` multiple 
times ? If so, then the last call will take effect for all the operators before 
or after the last call, since the setting will only take effect in 
`StreamExecutionEnvironment#execute`.
  3. 'assignTimeStampAndWatermark' will change the timestamp of the record. IMO 
you may think each record contains a timestamp field, and the filed is set when 
ingesting, but 'assignTimeStampAndWatermark' will change the value of this 
field, so the following operators relying on the timestamp will see the updated 
value.

Best,
Yun




--
From:M Singh 
Send Time:2019 Jul. 9 (Tue.) 09:42
To:User 
Subject:Apache Flink - Relation between stream time characteristic and timer 
triggers

Hi:

I have a few questions about the stream time characteristics:

1. If the time characteristic is set to TimeCharacteristic.EventTime, but the 
timers in a processor or trigger is set using registerProcessingTimeTimer (or 
vice versa), then will that timer fire ?  

2.  Once the time character is set on the stream environment, and changed later 
in the application, which one is applied, the first one or the last one ?

3.  If the stream time characteristic is set to IngestionTime, then is there 
any adverse effect of assigning the timestamp using  
assignTimeStampAndWatermark to a stream later in the application ?

Thanks