Re: Exception in Flink 1.3.0

2017-06-12 Thread rhashmi
Any update when 1.3.1 will be available?

Our current copy is 1.2.0 but that has separate issue(invalid type code:
00). 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-td13326.html#a13332



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-Flink-1-3-0-tp13493p13664.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Use Single Sink For All windows

2017-06-12 Thread rhashmi
Thanks Aljoscha for your response. 

I would give a try.. 
 


1- flink call *invoke* method of SinkFunction to dispatch aggregated
information. My follow up question here is .. while snapshotState method is
in process, if sink received another update then we might have mix records,
however per document all update stop during checkpoint. i assume this works
the same way.  


https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html

*"As soon as the operator receives snapshot barrier n from an incoming
stream, it cannot process any further records from that stream until it has
received the barrier n from the other inputs as well. Otherwise, it would
mix records that belong to snapshot n and with records that belong to
snapshot n+1."*

*"Streams that report barrier n are temporarily set aside. Records that are
received from these streams are not processed, but put into an input
buffer".
*


2- snapshotState method call when "checkpoint is requested". is there an
interface that provide when checkpoint complete .. I meant.. I will add my
flush logic right after completion of snapshot & before flink resume the
stream. With this approach we can assure that we update state only if the
checkpoint was successful. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-Single-Sink-For-All-windows-tp13475p13652.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Deterministic Update

2017-06-07 Thread rhashmi
Is there any possibility to trigger sink operator on completion of
checkpoint? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deterministic-Update-tp13580.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Use Single Sink For All windows

2017-06-06 Thread rhashmi
because of parallelism i am seeing db contention. Wondering if i can merge
sink of multiple windows and insert in batch. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-Single-Sink-For-All-windows-tp13475p13525.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Exception in Flink 1.3.0

2017-06-06 Thread rhashmi
Thanks for sharing ticket reference. Is there any time line as this is
blocker? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-Flink-1-3-0-tp13493p13500.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: ProcessFunction broke : Flink 1.3 upgrade

2017-06-06 Thread rhashmi
yes i see that so shall i update code from RichProcessFunction to
RichFunction(based object)? The implementation throw compile exception as
RichProcessFunction has been removed from 1.3.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ProcessFunction-broke-Flink-1-3-upgrade-tp13492p13498.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Exception in Flink 1.3.0

2017-06-05 Thread rhashmi
After upgrade i started getting this exception, is this a bug?


2017-06-05 23:45:03,423 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- UTCStream ->
Sink: UTC sink (2/12) (f78ef7f7368d27f414ebb9d0db7a26c8) switched from
RUNNING to FAILED.
java.lang.Exception: Could not perform checkpoint 1 for operator UTCStream
-> Sink: UTC sink (2/12).
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:551)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator
UTCStream -> Sink: UTC sink (2/12).
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
... 8 more
Caused by: java.lang.UnsupportedOperationException
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotConfiguration(TraversableSerializer.scala:155)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.buildConfigSnapshot(PojoSerializer.java:1183)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.snapshotConfiguration(PojoSerializer.java:572)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.snapshotConfiguration(PojoSerializer.java:55)
at
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:77)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-Flink-1-3-0-tp13493.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


ProcessFunction broke : Flink 1.3 upgrade

2017-06-05 Thread rhashmi
with 1.3 what should we use for processFunction?


org.apache.flink.streaming.api.functions.RichProcessFunction
org.apache.flink.streaming.api.functions.ProcessFunction.{OnTimerContext,
Context}



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ProcessFunction-broke-Flink-1-3-upgrade-tp13492.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Use Single Sink For All windows

2017-06-04 Thread rhashmi
Is it possible to return all windows update to single Sink (aggregated
collection).

The reason i am asking because we are using mysql for sink. I am wondering
if i can update all of the them in single batch so as to avoid possibly
avoid contention. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-Single-Sink-For-All-windows-tp13475.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpoints very slow with high backpressure

2017-06-02 Thread rhashmi
Nvm i found it. Backpressure caused by aws RDS instance of mysql. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13468.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpoints very slow with high backpressure

2017-06-01 Thread rhashmi
Enable info log. it seems it stuck 


==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log
<==
2017-06-01 12:45:18,229 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1 @ 1496321118221

==>
/mnt/ephemeral/logs/flink-flink-taskmanager-0-vpc2w2-rep-stage-flink1.log
<==
2017-06-01 12:45:18,237 INFO  org.apache.flink.core.fs.FileSystem   
   
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@79e68dd3 for Async calls
on Source: Custom Source (2/12)
2017-06-01 12:45:18,237 INFO  org.apache.flink.core.fs.FileSystem   
   
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@78da1e82 for Async calls
on Source: Custom Source (5/12)
2017-06-01 12:45:18,238 INFO  org.apache.flink.core.fs.FileSystem   
   
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@68bff79e for Async calls
on Source: Custom Source (8/12)
2017-06-01 12:45:18,238 INFO  org.apache.flink.core.fs.FileSystem   
   
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@600bdc29 for Async calls
on Source: Custom Source (11/12)
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer
 
- ==> KafkaConsumertest :: 
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer
 
- ==> KafkaConsumertest :: 
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer
 
- ==> KafkaConsumertest :: 
2017-06-01 12:45:24,854 INFO  com.company.deserializer.EventDeserializer
 
- ==> KafkaConsumertest :: 
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 4
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: state.backend, filesystem
2017-06-01 12:45:24,860 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,860 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,860 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,862 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability, zookeeper
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.quorum,
host1:2181,host2:2181
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.storageDir,
s3://somelocation/ha-recovery/
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, host
2017-06-01 

Re: Checkpoints very slow with high backpressure

2017-06-01 Thread rhashmi
I tried to extend timeout to 1 hour but no luck. it is still timing out & no
exception in log file So i am guessing something stuck, will dig down
further. 

Here is configuration detail. 
  
Standalone cluster & checkpoint store in S3. 

i just have 217680 messages in 24 partitions. 

Anyidea? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13419.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpoints very slow with high backpressure

2017-06-01 Thread rhashmi
I tried to extend timeout to 1 hour but no luck. it is still timing out. So i
am guessing something stuck, will dig down further. 

Here is configuration detail. 
 
Standalone cluster & checkpoint store in S3. 

i just have 217680 messages in 24 partitions. 

Anyidea?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13418.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpoints very slow with high backpressure

2017-05-31 Thread rhashmi
So what is the resolution? flink consuming messages from kafka. Flink went
down about a day ago, so now flink has to process 24 hour worth of events.
But i hit backpressure, as of right now checkpoint are timing out. Is there
any recommendation how to handle this situation? 

Seems like trigger are also not firing so no update being made to down line
database. 

is there recommended approach to handle backpressure?

Version Flink 1.2. 






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13411.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: State in Custom Tumble Window Class

2017-05-30 Thread rhashmi
Thanks Aljoscha Krettek, 

So the results will not be deterministic for late events. For idempotent
update, i would need to find an additional key base of current event time if
they are late and attached to the aggregator which probably possible by
doing some function(maxEventTime, actualEventTime). For that i need
maxEventTime to be stored as part of state & recover in case of runtime
failure.

Here is my corner case like. 
-- If assume whole flink runtime crashed(auto commit on) & after recovery
the first event arrived is from past(actually late). Without keeping max
currentTime state, may potentially override previous aggregate. 

I was wondering if i can record my last max EventTime as part of checkPoint, 
or run query against sink source to find last processed eventtime.

Any recommendation?
 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13387.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: invalid type code: 00

2017-05-26 Thread rhashmi
Which Flink version you are using?  1.2 
What is your job doing (e.g. operators that you are using)? ProcessFunction
to determine if event is late change event time to current & then window
Which operator throws this exception? i will have to dig it further
Which state-backend are you using? mysql. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326p13332.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


invalid type code: 00

2017-05-25 Thread rhashmi
Sprodically i am seeing this error. Any idea?


java.lang.IllegalStateException: Could not initialize keyed state backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:664)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:651)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1381)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at java.util.HashMap.readObject(HashMap.java:1404)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.readObject(PojoSerializer.java:130)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:292)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy.read(TypeSerializerSerializationProxy.java:97)
at
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:88)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:299)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:799)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
... 6 more



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: State in Custom Tumble Window Class

2017-05-22 Thread rhashmi
Could you elaborate this more? If i assume if i set window time to max ..
does it mean my window will stay for infinite time framework, 
Wouldn't this may hit memory overflow with time?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13255.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.