Re: [ANNOUNCE] Welcome Stefan Richter as a new committer

2017-02-15 Thread Shaoxuan Wang
Woohoo, Just saw this (was travelling).
Congratulations, Stefan! Looking forward to the promising future of the
backend state.

On Mon, Feb 13, 2017 at 5:20 PM, Stefan Richter  wrote:

> Thanks a lot! I feel very happy and will try help the Flink community as
> good as I can :-)
>
> Best,
> Stefan
>
> > Am 10.02.2017 um 11:00 schrieb Ufuk Celebi :
> >
> > Hey everyone,
> >
> > I'm very happy to announce that the Flink PMC has accepted Stefan
> > Richter to become a committer of the Apache Flink project.
> >
> > Stefan is part of the community for almost a year now and worked on
> > major features of the latest 1.2 release, most notably rescaling and
> > backwards compatibility of program state.
> >
> > Please join me in welcoming Stefan. :-)
> >
> > – Ufuk
>
>


Re: rocksdb without checkpointing

2017-02-15 Thread vinay patil
Good to hear that.

On which machine you are running your Flink Job, also what are the
configurations you have used for RocksDB

I am currently running on C3.4xlarge with predefined option set to
FLASH_SSD_OPTIMIZED

Regards,
Vinay Patil

On Thu, Feb 16, 2017 at 10:31 AM, abhishekrs [via Apache Flink User Mailing
List archive.]  wrote:

> Sorry, that was a red herring. Checkpointing was not getting triggered
> because we never enabled it.
>
> Our application is inherently restartable because we can use our own
> output to rebuild state. All that is working fine for us - including
> restart semantics - without having to worry about upgrading flink topology.
> Once we have something in production, will be happy to share more details
> in flink forums. We are very pleased with flink so far. Some paradigms are
> messy (scale of select for e.g), but we are very pleased overall !!
>
>
> On Wed, Feb 15, 2017 at 7:54 PM vinay patil <[hidden email]
> > wrote:
>
>> Hi Abhishek,
>>
>> You can disable checkpointing by not commenting env.enableCheckpointing
>>
>> What do you mean by "We are trying to do application level checkpointing"
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Feb 16, 2017 at 12:42 AM, abhishekrs [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> > wrote:
>>
>> Is it possible to set state backend as RocksDB without asking it to
>> checkpoint?
>>
>> We are trying to do application level checkpointing (since it gives us
>> better flexibility to upgrade our flink pipeline and also restore state in
>> a application specific upgrade friendly way). So we don’t really need
>> rocksDB to do any checkpointing. Moreover, we also observed that there is
>> 20s stall every hour that seems to correlate with rocksDB wanting to
>> checkpoint.
>>
>> Will the following work (effectively disable checkpointing)?
>>
>> new RocksDBStateBackend("file:///dev/null")
>>
>>
>> Or is there a better way?
>>
>> -Abhishek-
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/rocksdb-without-checkpointing-tp11645.html
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] 
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> 
>>
>>
>>
>> --
>> View this message in context: Re: rocksdb without checkpointing
>> 
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> 
>> at Nabble.com.
>>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/rocksdb-without-checkpointing-tp11645p11653.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/rocksdb-without-checkpointing-tp11645p11655.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Trapping Streaming Errors

2017-02-15 Thread Fabian Hueske
Hi Joe,

you can also insert a MapFunction between the Kafka source and the keyBy to
validate the IDs.
The mapper will be chained and should not add only minimal overhead. If you
want to keep the events which were incorrectly deserialized, you can use
split() to move them somewhere.

Validation in the deserialization code works as well of course but would
not allow to reroute invalid events.

Best, Fabian

2017-02-16 5:03 GMT+01:00 Joe Olson :

> If I am processing a stream in the following manner:
>
> val stream = env.addSource(consumer).name("KafkaStream")
> .keyBy(x => (x.obj.ID1(),x.obj.ID2(),x.obj.ID3())
> .flatMap(new FlatMapProcessor)
>
> and the IDs bomb out because of deserialization issues, my job crashes
> with a 'Could not extract key' error. How can I trap this cleanly? The only
> thing I can think of is to validate the IDs in the deserialization class
> argument that is used in the KafkaConsumer constructor, and trap any issues
> there. Is that the preferred way? Is there a better way?
>


Log4J

2017-02-15 Thread Chet Masterson
Is there a way to reload a log4j.properties file without stopping and starting the job server?


Trapping Streaming Errors

2017-02-15 Thread Joe Olson
If I am processing a stream in the following manner: 

val stream = env.addSource(consumer).name("KafkaStream") 
.keyBy(x => (x.obj.ID1(),x.obj.ID2(),x.obj.ID3()) 
.flatMap(new FlatMapProcessor) 

and the IDs bomb out because of deserialization issues, my job crashes with a 
'Could not extract key' error. How can I trap this cleanly? The only thing I 
can think of is to validate the IDs in the deserialization class argument that 
is used in the KafkaConsumer constructor, and trap any issues there. Is that 
the preferred way? Is there a better way? 


Re: rocksdb without checkpointing

2017-02-15 Thread vinay patil
Hi Abhishek,

You can disable checkpointing by not commenting env.enableCheckpointing

What do you mean by "We are trying to do application level checkpointing"

Regards,
Vinay Patil

On Thu, Feb 16, 2017 at 12:42 AM, abhishekrs [via Apache Flink User Mailing
List archive.]  wrote:

> Is it possible to set state backend as RocksDB without asking it to
> checkpoint?
>
> We are trying to do application level checkpointing (since it gives us
> better flexibility to upgrade our flink pipeline and also restore state in
> a application specific upgrade friendly way). So we don’t really need
> rocksDB to do any checkpointing. Moreover, we also observed that there is
> 20s stall every hour that seems to correlate with rocksDB wanting to
> checkpoint.
>
> Will the following work (effectively disable checkpointing)?
>
> new RocksDBStateBackend("file:///dev/null")
>
>
> Or is there a better way?
>
> -Abhishek-
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/rocksdb-without-checkpointing-tp11645.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/rocksdb-without-checkpointing-tp11645p11650.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Flink jdbc

2017-02-15 Thread Ted Yu
See the tutorial at the beginning of:

flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

Looks like plugging in "org.h2.Driver" should do.

On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel 
wrote:

> Hi All
>
> Does flink jdbc support writing the data into H2 Database?
>
> Thanks
> Punit
>
>


Flink jdbc

2017-02-15 Thread Punit Tandel

Hi All

Does flink jdbc support writing the data into H2 Database?

Thanks
Punit



Flink Job Exception

2017-02-15 Thread Govindarajan Srinivasaraghavan
Hi All,

I'm trying to run a streaming job with flink 1.2 version and there are 3
task managers with 12 task slots. Irrespective of the parallelism that I
give it always fails with the below error and I found a JIRA link
corresponding to this issue. Can I know by when this will be resolved since
I'm not able to run any job in my current environment. Thanks.

https://issues.apache.org/jira/browse/FLINK-5773

java.lang.ClassCastException: Cannot cast scala.util.Failure to
org.apache.flink.runtime.messages.Acknowledge
at java.lang.Class.cast(Class.java:3369)
at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Success.map(Try.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at 
scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Reliable Distributed FS support (HCFS)

2017-02-15 Thread Vijay Srinivasaraghavan
Hello,
Regarding the Filesystem abstraction support, we are planning to use a 
distributed file system which complies with Hadoop Compatible File System 
(HCFS) standard in place of standard HDFS.
According to the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html),
 persistence gurantees is listed as one of the main requirement and to be 
precises it qualifies both visibility and durability gurantees.
My question is,
1) Are we expecting the file system to support "Atomic Rename" characteristics? 
I believe checkpoint mechanism involves in renaming the files and will that 
have an impact if "atomic rename" is not guranteed by the underlying file 
system?
2) How does one certify Flink with HCFS (in place of standard HDFS) in terms of 
the scenarios/usecase that needs to be tested? Is there any general guidance on 
this?
ThanksVijay

rocksdb without checkpointing

2017-02-15 Thread Abhishek R. Singh
Is it possible to set state backend as RocksDB without asking it to checkpoint?

We are trying to do application level checkpointing (since it gives us better 
flexibility to upgrade our flink pipeline and also restore state in a 
application specific upgrade friendly way). So we don’t really need rocksDB to 
do any checkpointing. Moreover, we also observed that there is 20s stall every 
hour that seems to correlate with rocksDB wanting to checkpoint.

Will the following work (effectively disable checkpointing)? 
new RocksDBStateBackend("file:///dev/null") 

Or is there a better way?

-Abhishek-

Re: Checkpointing with RocksDB as statebackend

2017-02-15 Thread Vinay Patil
Hi Guys,

Can anyone please help me with this issue

Regards,
Vinay Patil

On Wed, Feb 15, 2017 at 6:17 PM, Vinay Patil 
wrote:

> Hi Ted,
>
> I have 3 boxes in my pipeline , 1st and 2nd box containing source and s3
> sink and the 3rd box is window operator followed by chained operators and a
> s3 sink
>
> So in the details link section I can see that that S3 sink is taking time
> for the acknowledgement and it is not even going to the window operator
> chain.
>
> But as shown in the snapshot ,checkpoint id 19 did not get any
> acknowledgement. Not sure what is causing the issue
>
> Regards,
> Vinay Patil
>
> On Wed, Feb 15, 2017 at 5:51 PM, Ted Yu [via Apache Flink User Mailing
> List archive.]  wrote:
>
>> What did the More Details link say ?
>>
>> Thanks
>>
>> > On Feb 15, 2017, at 3:11 AM, vinay patil <[hidden email]
>> > wrote:
>> >
>> > Hi,
>> >
>> > I have kept the checkpointing interval to 6secs and minimum pause
>> between
>> > checkpoints to 5secs, while testing the pipeline I have observed that
>> that
>> > for some checkpoints it is taking long time , as you can see in the
>> attached
>> > snapshot checkpoint id 19 took the maximum time before it gets failed,
>> > although it has not received any acknowledgements, now during this
>> 10minutes
>> > the entire pipeline did not make any progress and no data was getting
>> > processed. (For Ex : In 13minutes 20M records were processed and when
>> the
>> > checkpoint took time there was no progress for the next 10minutes)
>> >
>> > I have even tried to set max checkpoint timeout to 3min, but in that
>> case as
>> > well multiple checkpoints were getting failed.
>> >
>> > I have set RocksDB FLASH_SSD_OPTION
>> > What could be the issue ?
>> >
>> > P.S. I am writing to 3 S3 sinks
>> >
>> > checkpointing_issue.PNG
>> > > nabble.com/file/n11640/checkpointing_issue.PNG>
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Checkpointing-with-
>> RocksDB-as-statebackend-tp11640.html
>> > Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Checkpointing-with-RocksDB-as-statebackend-tp11640p11641.html
>> To start a new topic under Apache Flink User Mailing List archive., email
>> ml-node+s2336050n1...@n4.nabble.com
>> To unsubscribe from Apache Flink User Mailing List archive., click here
>> 
>> .
>> NAML
>> 
>>
>
>


Re: Checkpointing with RocksDB as statebackend

2017-02-15 Thread vinay patil
Hi Ted,

I have 3 boxes in my pipeline , 1st and 2nd box containing source and s3
sink and the 3rd box is window operator followed by chained operators and a
s3 sink

So in the details link section I can see that that S3 sink is taking time
for the acknowledgement and it is not even going to the window operator
chain.

But as shown in the snapshot ,checkpoint id 19 did not get any
acknowledgement. Not sure what is causing the issue

Regards,
Vinay Patil

On Wed, Feb 15, 2017 at 5:51 PM, Ted Yu [via Apache Flink User Mailing List
archive.]  wrote:

> What did the More Details link say ?
>
> Thanks
>
> > On Feb 15, 2017, at 3:11 AM, vinay patil <[hidden email]
> > wrote:
> >
> > Hi,
> >
> > I have kept the checkpointing interval to 6secs and minimum pause
> between
> > checkpoints to 5secs, while testing the pipeline I have observed that
> that
> > for some checkpoints it is taking long time , as you can see in the
> attached
> > snapshot checkpoint id 19 took the maximum time before it gets failed,
> > although it has not received any acknowledgements, now during this
> 10minutes
> > the entire pipeline did not make any progress and no data was getting
> > processed. (For Ex : In 13minutes 20M records were processed and when
> the
> > checkpoint took time there was no progress for the next 10minutes)
> >
> > I have even tried to set max checkpoint timeout to 3min, but in that
> case as
> > well multiple checkpoints were getting failed.
> >
> > I have set RocksDB FLASH_SSD_OPTION
> > What could be the issue ?
> >
> > P.S. I am writing to 3 S3 sinks
> >
> > checkpointing_issue.PNG
> >  n4.nabble.com/file/n11640/checkpointing_issue.PNG>
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Checkpointing-
> with-RocksDB-as-statebackend-tp11640.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-
> tp11640p11641.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-tp11640p11643.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Checkpointing with RocksDB as statebackend

2017-02-15 Thread Ted Yu
What did the More Details link say ?

Thanks 

> On Feb 15, 2017, at 3:11 AM, vinay patil  wrote:
> 
> Hi,
> 
> I have kept the checkpointing interval to 6secs and minimum pause between
> checkpoints to 5secs, while testing the pipeline I have observed that that
> for some checkpoints it is taking long time , as you can see in the attached
> snapshot checkpoint id 19 took the maximum time before it gets failed,
> although it has not received any acknowledgements, now during this 10minutes
> the entire pipeline did not make any progress and no data was getting
> processed. (For Ex : In 13minutes 20M records were processed and when the
> checkpoint took time there was no progress for the next 10minutes)
> 
> I have even tried to set max checkpoint timeout to 3min, but in that case as
> well multiple checkpoints were getting failed.
> 
> I have set RocksDB FLASH_SSD_OPTION 
> What could be the issue ? 
> 
> P.S. I am writing to 3 S3 sinks 
> 
> checkpointing_issue.PNG
> 
>   
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-tp11640.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Checkpointing with RocksDB as statebackend

2017-02-15 Thread vinay patil
Hi,

I have kept the checkpointing interval to 6secs and minimum pause between
checkpoints to 5secs, while testing the pipeline I have observed that that
for some checkpoints it is taking long time , as you can see in the attached
snapshot checkpoint id 19 took the maximum time before it gets failed,
although it has not received any acknowledgements, now during this 10minutes
the entire pipeline did not make any progress and no data was getting
processed. (For Ex : In 13minutes 20M records were processed and when the
checkpoint took time there was no progress for the next 10minutes)

I have even tried to set max checkpoint timeout to 3min, but in that case as
well multiple checkpoints were getting failed.

I have set RocksDB FLASH_SSD_OPTION 
What could be the issue ? 

P.S. I am writing to 3 S3 sinks 

checkpointing_issue.PNG

  



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-tp11640.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Unable to use Scala's BeanProperty with classes

2017-02-15 Thread Adarsh Jain
Thanks Timo, removing  `@BeanProperty`  is giving no getters, no setters
error




On Wed, Feb 15, 2017 at 3:45 PM, Timo Walther  wrote:

> Forget what I said about omitting `var`, this would remove the field from
> the POJO. I opened a PR for fixing the issue: https://github.com/apache/
> flink/pull/3318
>
> As a workaround: If you just want to have a POJO for the Cassandra Sink
> you don't need to add the `@BeanProperty` annotation. Flink supports also
> Scala-style POJOs. You just have to make sure that your class contains a
> default constructor.
>
> This is a valid POJO:
>
>   class SomeClass(var prop: Int) {
> def this() = this(0)
>   }
>
> I hope that helps.
>
> Timo
>
>
> Am 15/02/17 um 10:48 schrieb Chesnay Schepler:
>
> Hello,
>
> There is an open PR about adding support for case classes to the cassandra
> sinks: https://github.com/apache/flink/pull/2633
>
> You would have to checkout the branch and build it yourself. If this works
> for you it would be great if you could also give some
> feedback either here or in the PR.
>
> Regards,
> Chesnay
>
> On 15.02.2017 10:08, Adarsh Jain wrote:
>
> Thanks Fabian, I need to sink data in Cassandra and direct sink with case
> class is not available (correct me if I am wrong)
>
> If we use Tuple then we are restricted to 22 fields
>
> What do you suggest here?
>
>
>
>
> On Wed, Feb 15, 2017 at 2:32 PM, Fabian Hueske  wrote:
>
>> Hi Adarsh,
>>
>> I think this is the same bug. I'm afraid you have to wait until the
>> problem is fixed.
>> The only workaround would be to use a different data type, for example a
>> case class.
>>
>> Best, Fabian
>>
>> 2017-02-15 6:08 GMT+01:00 Adarsh Jain :
>>
>>> Any help will be highly appreciable, am stuck on this one.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Feb 14, 2017 at 10:47 PM, Nico Kruber 
>>> wrote:
>>>
 Hi Adarsh,
 thanks for reporting this. It should be fixed eventually.

 @Timo: do you have an idea for a work-around or quick-fix?


 Regards
 Nico

 On Tuesday, 14 February 2017 21:11:21 CET Adarsh Jain wrote:
 > I am getting the same problem when trying to do FlatMap operation on
 my
 > POJO class.
 >
 > Exception in thread "main" java.lang.IllegalStateException: Detected
 > more than one setter
 >
 >
 >
 > Am using Flink 1.2, the exception is coming when using FlatMap
 >
 > https://issues.apache.org/jira/browse/FLINK-5070


>>>
>>
>
>
>


Re: A way to control redistribution of operator state?

2017-02-15 Thread Stefan Richter
Hi,

I think the clean solution would be using raw keyed state once it becomes 
available. For the meantime, your solution could work. However, you should be 
aware that your approach does not rely on a contract but an implementation 
detail that *could* change between versions and break your code in subtle ways.

Best,
Stefan

> Am 14.02.2017 um 12:19 schrieb Dmitry Golubets :
> 
> Hi,
> 
> I was playing with it more today and I think I've found a workaround.
> 
> So what I do:
> 1. I define a constant N logical groups
> 2. I use consistent hash mapping of data keys to these groups
> 3. I map these groups to partitions using even distribution (same as Flink 
> distributes state)
> 4. In a stateful function I'm able to calculate wich groups are assigned to 
> that partition and produce the right number of states for each groups (empty 
> states too)
> 5. I do manual partitioning before that stateful function using same 
> calculations with groups
> 
> So far it looks like scaling up and down results in correct behavior.
> Can I rely on Flink distributing state evenly and in the order I return it in 
> the list?
> 
> Best regards,
> Dmitry
> 
> On Tue, Feb 14, 2017 at 9:33 AM, Stefan Richter  > wrote:
> Hi,
> 
> there is something that we call "raw keyed“ operator state, which might 
> exactly serve your purpose. It is already used internally by Flink’s window 
> operator, but there exists currently no public API for this feature. Way it 
> works currently is that you obtain input and output streams that are aware of 
> key-groups being written or read, but the API needs to consider the fact that 
> each key-group must be written only once and complete before the next 
> key-group can start. This is a bit tricky to expose for inheritance 
> hierarchies. My guess is that you can expect this for the next version of 
> Flink.
> 
> Best,
> Stefan
> 
>> Am 14.02.2017 um 08:31 schrieb Tzu-Li (Gordon) Tai > >:
>> 
>> Hi Dmitry,
>> 
>> Technically, from the looks of the internal code around 
>> `OperatorStateRepartitioner`, I think it is certainly possible to be 
>> pluggable.
>> Right now it is just hard coded to use a round-robin repartitioner 
>> implementation as default.
>> 
>> However, I’m not sure of the plans in exposing this to the user and making 
>> it configurable.
>> Looping in Stefan (in cc) who mostly worked on this part and see if he can 
>> provide more info.
>> 
>> - Gordon
>> 
>> On February 14, 2017 at 2:30:27 AM, Dmitry Golubets (dgolub...@gmail.com 
>> ) wrote:
>> 
>>> Hi,
>>> 
>>> It looks impossible to implement a keyed state with operator state now.
>>> 
>>> I know it sounds like "just use a keyed state", but latter requires 
>>> updating it on every value change as opposed to operator state and thus can 
>>> be expensive (especially if you have to deal with mutable structures inside 
>>> which have to be serialized).
>>> 
>>> The problem is that there is no way to tell Flink how to reassign savepoint 
>>> parts between partitions, and thus impossible to route data to correct 
>>> partitions.
>>> 
>>> Is there anything I missed or maybe a plan to implement it in future?
>>> 
>>> Best regards,
>>> Dmitry
> 
> 



Re: Unable to use Scala's BeanProperty with classes

2017-02-15 Thread Timo Walther
Forget what I said about omitting `var`, this would remove the field 
from the POJO. I opened a PR for fixing the issue: 
https://github.com/apache/flink/pull/3318


As a workaround: If you just want to have a POJO for the Cassandra Sink 
you don't need to add the `@BeanProperty` annotation. Flink supports 
also Scala-style POJOs. You just have to make sure that your class 
contains a default constructor.


This is a valid POJO:

  class SomeClass(var prop: Int) {
def this() = this(0)
  }

I hope that helps.

Timo


Am 15/02/17 um 10:48 schrieb Chesnay Schepler:

Hello,

There is an open PR about adding support for case classes to the 
cassandra sinks: https://github.com/apache/flink/pull/2633


You would have to checkout the branch and build it yourself. If this 
works for you it would be great if you could also give some

feedback either here or in the PR.

Regards,
Chesnay

On 15.02.2017 10:08, Adarsh Jain wrote:
Thanks Fabian, I need to sink data in Cassandra and direct sink with 
case class is not available (correct me if I am wrong)


If we use Tuple then we are restricted to 22 fields

What do you suggest here?




On Wed, Feb 15, 2017 at 2:32 PM, Fabian Hueske > wrote:


Hi Adarsh,

I think this is the same bug. I'm afraid you have to wait until
the problem is fixed.
The only workaround would be to use a different data type, for
example a case class.

Best, Fabian

2017-02-15 6:08 GMT+01:00 Adarsh Jain :

Any help will be highly appreciable, am stuck on this one.






On Tue, Feb 14, 2017 at 10:47 PM, Nico Kruber
> wrote:

Hi Adarsh,
thanks for reporting this. It should be fixed eventually.

@Timo: do you have an idea for a work-around or quick-fix?


Regards
Nico

On Tuesday, 14 February 2017 21:11:21 CET Adarsh Jain wrote:
> I am getting the same problem when trying to do FlatMap
operation on my
> POJO class.
>
> Exception in thread "main"
java.lang.IllegalStateException: Detected
> more than one setter
>
>
>
> Am using Flink 1.2, the exception is coming when using
FlatMap
>
> https://issues.apache.org/jira/browse/FLINK-5070











Re: Unable to use Scala's BeanProperty with classes

2017-02-15 Thread Chesnay Schepler

Hello,

There is an open PR about adding support for case classes to the 
cassandra sinks: https://github.com/apache/flink/pull/2633


You would have to checkout the branch and build it yourself. If this 
works for you it would be great if you could also give some

feedback either here or in the PR.

Regards,
Chesnay

On 15.02.2017 10:08, Adarsh Jain wrote:
Thanks Fabian, I need to sink data in Cassandra and direct sink with 
case class is not available (correct me if I am wrong)


If we use Tuple then we are restricted to 22 fields

What do you suggest here?




On Wed, Feb 15, 2017 at 2:32 PM, Fabian Hueske > wrote:


Hi Adarsh,

I think this is the same bug. I'm afraid you have to wait until
the problem is fixed.
The only workaround would be to use a different data type, for
example a case class.

Best, Fabian

2017-02-15 6:08 GMT+01:00 Adarsh Jain >:

Any help will be highly appreciable, am stuck on this one.






On Tue, Feb 14, 2017 at 10:47 PM, Nico Kruber
> wrote:

Hi Adarsh,
thanks for reporting this. It should be fixed eventually.

@Timo: do you have an idea for a work-around or quick-fix?


Regards
Nico

On Tuesday, 14 February 2017 21:11:21 CET Adarsh Jain wrote:
> I am getting the same problem when trying to do FlatMap
operation on my
> POJO class.
>
> Exception in thread "main"
java.lang.IllegalStateException: Detected
> more than one setter
>
>
>
> Am using Flink 1.2, the exception is coming when using
FlatMap
>
> https://issues.apache.org/jira/browse/FLINK-5070









Re: Unable to use Scala's BeanProperty with classes

2017-02-15 Thread Timo Walther

Hi Adarsh,

I looked into your issue. The problem is that `var` generates 
Scala-style getters/setters and the annotation generates Java-style 
getters/setters. Right now Flink only supports one style in a POJO, I 
don't know why we have this restriction. I will work on a fix for that. 
Is it possible for you to remove the `var`? According to this 
Stackoverflow it is also considered a bad practice: 
http://stackoverflow.com/questions/35038977/is-it-good-practice-to-use-beanproperty-in-scala-instead-of-defining-getter-set


Regards,
Timo




Am 15/02/17 um 10:08 schrieb Adarsh Jain:
Thanks Fabian, I need to sink data in Cassandra and direct sink with 
case class is not available (correct me if I am wrong)


If we use Tuple then we are restricted to 22 fields

What do you suggest here?




On Wed, Feb 15, 2017 at 2:32 PM, Fabian Hueske > wrote:


Hi Adarsh,

I think this is the same bug. I'm afraid you have to wait until
the problem is fixed.
The only workaround would be to use a different data type, for
example a case class.

Best, Fabian

2017-02-15 6:08 GMT+01:00 Adarsh Jain >:

Any help will be highly appreciable, am stuck on this one.






On Tue, Feb 14, 2017 at 10:47 PM, Nico Kruber
> wrote:

Hi Adarsh,
thanks for reporting this. It should be fixed eventually.

@Timo: do you have an idea for a work-around or quick-fix?


Regards
Nico

On Tuesday, 14 February 2017 21:11:21 CET Adarsh Jain wrote:
> I am getting the same problem when trying to do FlatMap
operation on my
> POJO class.
>
> Exception in thread "main"
java.lang.IllegalStateException: Detected
> more than one setter
>
>
>
> Am using Flink 1.2, the exception is coming when using
FlatMap
>
> https://issues.apache.org/jira/browse/FLINK-5070









Re: Unable to use Scala's BeanProperty with classes

2017-02-15 Thread Adarsh Jain
Thanks Fabian, I need to sink data in Cassandra and direct sink with case
class is not available (correct me if I am wrong)

If we use Tuple then we are restricted to 22 fields

What do you suggest here?




On Wed, Feb 15, 2017 at 2:32 PM, Fabian Hueske  wrote:

> Hi Adarsh,
>
> I think this is the same bug. I'm afraid you have to wait until the
> problem is fixed.
> The only workaround would be to use a different data type, for example a
> case class.
>
> Best, Fabian
>
> 2017-02-15 6:08 GMT+01:00 Adarsh Jain :
>
>> Any help will be highly appreciable, am stuck on this one.
>>
>>
>>
>>
>>
>>
>> On Tue, Feb 14, 2017 at 10:47 PM, Nico Kruber 
>> wrote:
>>
>>> Hi Adarsh,
>>> thanks for reporting this. It should be fixed eventually.
>>>
>>> @Timo: do you have an idea for a work-around or quick-fix?
>>>
>>>
>>> Regards
>>> Nico
>>>
>>> On Tuesday, 14 February 2017 21:11:21 CET Adarsh Jain wrote:
>>> > I am getting the same problem when trying to do FlatMap operation on my
>>> > POJO class.
>>> >
>>> > Exception in thread "main" java.lang.IllegalStateException: Detected
>>> > more than one setter
>>> >
>>> >
>>> >
>>> > Am using Flink 1.2, the exception is coming when using FlatMap
>>> >
>>> > https://issues.apache.org/jira/browse/FLINK-5070
>>>
>>>
>>
>


Re: Unable to use Scala's BeanProperty with classes

2017-02-15 Thread Fabian Hueske
Hi Adarsh,

I think this is the same bug. I'm afraid you have to wait until the problem
is fixed.
The only workaround would be to use a different data type, for example a
case class.

Best, Fabian

2017-02-15 6:08 GMT+01:00 Adarsh Jain :

> Any help will be highly appreciable, am stuck on this one.
>
>
>
>
>
>
> On Tue, Feb 14, 2017 at 10:47 PM, Nico Kruber 
> wrote:
>
>> Hi Adarsh,
>> thanks for reporting this. It should be fixed eventually.
>>
>> @Timo: do you have an idea for a work-around or quick-fix?
>>
>>
>> Regards
>> Nico
>>
>> On Tuesday, 14 February 2017 21:11:21 CET Adarsh Jain wrote:
>> > I am getting the same problem when trying to do FlatMap operation on my
>> > POJO class.
>> >
>> > Exception in thread "main" java.lang.IllegalStateException: Detected
>> > more than one setter
>> >
>> >
>> >
>> > Am using Flink 1.2, the exception is coming when using FlatMap
>> >
>> > https://issues.apache.org/jira/browse/FLINK-5070
>>
>>
>


Re: Watermarks per key

2017-02-15 Thread Fabian Hueske
Hi Jordan,

it is not possible to generate watermarks per key. This feature has been
requested a couple of times but I think there are no plans to implement
that.
As far as I understand, the management of watermarks would be quite
expensive (maintaining several watermarks, purging watermarks of expired
keys, etc.) but Aljoscha (in CC) can share details about that.

Best,
Fabian

2017-02-15 2:02 GMT+01:00 Jordan Ganoff :

> Hi,
>
> I’m designing a streaming job whose elements need to be windowed by event
> time across a large set of keys. All elements are read from the same
> source. Event time progresses independently across keys. Is it possible to
> assign timestamps, and thus generate independent watermarks, per keyed
> stream, so late arriving elements can be handled per keyed stream?
>
> And in general, what’s the best approach to designing a job that needs to
> process different keyed streams whose event times do not relate to each
> other? My current approach generates timestamps at the source but never
> generates watermarks so no record is ever considered late. This has the
> unfortunate side effect of windows never closing. As a result, each event
> time window relies on a custom trigger which fires and purges the window
> after a given amount of processing time elapses during which no new records
> arrived.
>
> Thanks,
> Jordan


Flink batch processing fault tolerance

2017-02-15 Thread Renjie Liu
Hi, all:
I'm learning flink's doc and curious about the fault tolerance of batch
process jobs. It seems that when one of task execution fails, the whole job
will be restarted, is it true? If so, isn't it impractical to deploy large
flink batch jobs?
-- 
Liu, Renjie
Software Engineer, MVAD