How are kafka consumer offsets handled if sink fails?

2019-07-05 Thread John Smith
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(),props);


Do some JSON transforms and then push to my SQL database using JDBC and
stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done
manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it
continue on the 3rd or will the job restart and try those 5 events.


FlinkKafkaConsumer - Save watermark assigner in the state

2019-07-05 Thread Juan Gentile
Hello,

We are currently facing an issue where we need to store the instance of the 
watermark and timestamp assigner in the state while consumer from Kafka.
For that purpose we took a look at FlinkKafkaConsumerBase and noticed that 
since the methods (snapshotState and initializeState from the 
CheckpointedFunction) are final we can’t override them.
An alternative could be to wrap the class but we don’t like that solution 
either as it would make it harder to migrate in the future if there are changes 
there.
We would like to know if it’d be possible to store the ‘assigner’ in the state 
of the FlinkKafkaConsumerBase. We think it’s not a big change and we are 
willing to do it if you agree.

Thank you,
Juan G.


Flink Table API and Date fields

2019-07-05 Thread Flavio Pompermaier
Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException:
SQL validation failed. Type is not supported: Date
at
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at
org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at
org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not
supported: Date
at
org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a
Long one?

Best,
Flavio


Re: does mapFunction need to implement CheckpointedFunction?

2019-07-05 Thread Biao Liu
Sorry, I don't get your point. Before answering the question, I guess we
need to make sure what you exactly want.

BTW, have you read the document of checkpointing or state? [1] [2]
Maybe it could help.

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html#checkpoints-for-fault-tolerance
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/

liu ze  于2019年7月5日周五 下午2:37写道:

> Hi,
>
> I want to update third-party system in the mapFunction ,does mapFunction
> need to implement CheckpointedFunction?
>
> For example, in the mapFunction I want to update mysql, do I need to
> implement checkpointfunc,  manage the state myself
>
>
> stream=env.addSource()
>
> stream.map(
>
> "insert update mysql"
>
> "A checkpointState to be implemented here?"
> )
>
> stream.addsink(kafka)
>
>
> good luck!


fink的sql热添加问题

2019-07-05 Thread 杨胜松(鼓翅)

你好!

请教下,假设我一个job现在有三个sql已经在跑了,现在我想加第四个sql进来,那么我一定要重发这个拓扑么?有什么办法可以在不影响这三个sql计算的情况下,把第四个sql也加进来么?





--
地址:浙江-杭州-余杭-西溪园区 3-3-N-89



Re: Providing Custom Serializer for Generic Type

2019-07-05 Thread Andrea Spina
Hi Gordon, thank you.
The involved data structure is a complex abstraction owning a schema and
values, it declares private fields which should not be edited directly from
users. I'd say it's really akin to an Avro GenericRecord. How would you
approach the problem if you have to serialize/deserialize efficiently an
Avro GenericRecord? I think it cannot be a POJO and ser/de using avro
brings so much overhead described also at [1].

Thank you really much for your help.

Andrea

[1] -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html

Il giorno gio 4 lug 2019 alle ore 11:23 Tzu-Li (Gordon) Tai <
tzuli...@apache.org> ha scritto:

> Hi Andrea,
>
> Is there a specific reason you want to use a custom TypeInformation /
> TypeSerializer for your type?
> From the description in the original post, this part wasn't clear to me.
>
> If the only reason is because it is generally suggested to avoid generic
> type serialization via Kryo, both for performance reasons as well as
> evolvability in the future, then updating your type to be recognized by
> Flink as one of the supported types [1] would be enough.
> Otherwise, implementing your own type information and serializer is
> usually only something users with very specific use cases might be required
> to do.
> Since you are also using that type as managed state, for a safer schema
> evolvability story in the future, I would recommend either Avro or Pojo as
> Jingsong Lee had already mentioned.
>
> Cheers,
> Gordon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#flinks-typeinformation-class
>
> On Thu, Jul 4, 2019 at 5:08 PM Andrea Spina 
> wrote:
>
>> Hi JingsongLee, thank you for your answer.
>> I wanted to explore it as the last chance honestly. Anyway if defining
>> custom serializers and types information involves quite a big effort, I
>> would reconsider my guess.
>>
>> Cheers,
>>
>> Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee <
>> lzljs3620...@aliyun.com> ha scritto:
>>
>>> Hi Andrea:
>>> Why not make your *MyClass* POJO? [1] If it is a POJO, then flink
>>> will use PojoTypeInfo and PojoSerializer that have a good
>>> implementation already.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types
>>>
>>> Best, JingsongLee
>>>
>>> --
>>> From:Andrea Spina 
>>> Send Time:2019年7月4日(星期四) 14:37
>>> To:user 
>>> Subject:Providing Custom Serializer for Generic Type
>>>
>>> Dear community,
>>> in my job, I run with a custom event type *MyClass* which is a sort of
>>> "generic event" that I handle all along my streaming flow both as an event
>>> (DataStream[MyClass]) and as a managed state.
>>>
>>> I see that Flink warns me about generic serialization of
>>> *MyClass*
>>>  INFO [run-main-0] (TypeExtractor.java:1818) - class
>>> io.radicalbit.MyClass does not contain a setter for field
>>> io$radicalbit$MyClass$$schema
>>>  INFO [run-main-0] (TypeExtractor.java:1857) - Class class
>>> io.radicalbit.MyClass cannot be used as a POJO type because not all fields
>>> are valid POJO fields, and must be processed as GenericType. Please read
>>> the Flink documentation on "Data Types & Serialization" for details of the
>>> effect on performance.
>>>  INFO [run-main-0] (TypeExtractor.java:1818) - class
>>> io.radicalbit.MyClass does not contain a setter for field
>>> io$radicalbit$MyClass$schema
>>>
>>> So that I wanted to provide my custom serializer for MyClass, trying
>>> first to register the Java one to check if the system recognizes it so I
>>> followed [1] but it seems that it is not considered.
>>>
>>> I read then about [2] (the case is way akin to mine) and AFAIU I need to
>>> implement a custom TypeInformation and TypeSerializer for my class as
>>> suggested in [3] because Flink will ignore my registered serializer as long
>>> as it considers my type as *generic*.
>>>
>>> config.registerTypeWithKryoSerializer(classOf[MyClass], 
>>> classOf[RadicalSerde])
>>>
>>>
>>> My question finally is: Do I need to provide this custom classes? Is
>>> there any practical example for creating custom information like the above
>>> mentioned? I have had a quick preliminary look at it but seems that I need
>>> to provide a non-trivial amount of information to TypeInformation and
>>> TypeSerializer interfaces.
>>>
>>> Thank you for your excellent work and help.
>>>
>>> Cheers.
>>>
>>> [1] -
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>>> [2] -
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
>>> [3] -
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
>>> --
>>> Andrea Spina
>>> Head of R @ Radicalbit Srl
>>> Via Giovanni Battista 

does mapFunction need to implement CheckpointedFunction?

2019-07-05 Thread liu ze
Hi,

I want to update third-party system in the mapFunction ,does mapFunction need 
to implement CheckpointedFunction?

For example, in the mapFunction I want to update mysql, do I need to implement 
checkpointfunc,  manage the state myself


stream=env.addSource()

stream.map(

"insert update mysql"

"A checkpointState to be implemented here?"
)

stream.addsink(kafka)


good luck!