Re: Task Manager recovery in Standalone Cluster High Availability mode

2017-02-21 Thread F.Amara
Hi,

Thanks a lot for the reply. I configured a restart strategy as suggested and
now the TM failure scenario is working as expected. Once a TM is killed
another active TM automatically recovers the job.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-Manager-recovery-in-Standalone-Cluster-High-Availability-mode-tp11767p11798.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Writing Tuple2 to a sink

2017-02-21 Thread 刘彪
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with
this situation. There is a KeyedSerializationSchema user have to implement.
  KeyedSerializationSchema will be used to serialize data, so that
SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in
SinkFunction. And user have to implement the SerializationSchema, maybe
named Tuple2SerializationSchema.

2017-02-22 7:17 GMT+08:00 Mohit Anchlia :

> What's the best way to retrieve both the values in Tuple2 inside a custom
> sink given that the type is not known inside the sink function?
>


Re: flink on yarn ha

2017-02-21 Thread lining jing
Hi,
I update flink from 1.1.3 to 1.2

but fail

this is jobManager error log

Failed toString() invocation on an object of type
[org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl]
java.lang.NoSuchMethodError:
org.apache.hadoop.security.proto.SecurityProtos.getDescriptor()Lorg/apache/flink/hadoop/shaded/com/google/protobuf/Des
criptors$FileDescriptor;
at
org.apache.hadoop.yarn.proto.YarnProtos.(YarnProtos.java:47614)
at
org.apache.hadoop.yarn.proto.YarnProtos$LocalResourceProto.internalGetFieldAccessorTable(YarnProtos.java:11542)
at
org.apache.flink.hadoop.shaded.com.google.protobuf.GeneratedMessage.getAllFieldsMutable(GeneratedMessage.java:105)
at
org.apache.flink.hadoop.shaded.com.google.protobuf.GeneratedMessage.getAllFields(GeneratedMessage.java:153)
at
org.apache.flink.hadoop.shaded.com.google.protobuf.TextFormat$Printer.print(TextFormat.java:272)
at
org.apache.flink.hadoop.shaded.com.google.protobuf.TextFormat$Printer.access$400(TextFormat.java:248)
at
org.apache.flink.hadoop.shaded.com.google.protobuf.TextFormat.shortDebugString(TextFormat.java:88)
at
org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl.toString(LocalResourcePBImpl.java:77)
at
org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:305)
at
org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:277)
at
org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:231)
at
org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:124)
at org.slf4j.impl.Log4jLoggerAdapter.info
(Log4jLoggerAdapter.java:322)
at
org.apache.flink.yarn.YarnApplicationMasterRunner.createTaskManagerContext(YarnApplicationMasterRunner.java:684)
at
org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:331)
at
org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:203)
at
org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:200)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at
org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:200)
at
org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:124)

2017-02-22 8:44 GMT+08:00 lining jing :

> Thanks, Stephan !  I will try it!
>
> 2017-02-21 21:42 GMT+08:00 Stephan Ewen :
>
>> Hi!
>>
>> Flink 1.1.4 and Flink 1.2 fixed a bunch of issues with HA, can you try
>> those versions?
>>
>> If these also have issues, could you share the logs of the JobManager?
>>
>> Thanks!
>>
>> On Tue, Feb 21, 2017 at 11:41 AM, lining jing 
>> wrote:
>>
>>> flink version: 1.1.3
>>>
>>> kill jobmanager, the job fail. Ha config did not work.
>>>
>>
>>
>


Re: flink on yarn ha

2017-02-21 Thread lining jing
Thanks, Stephan !  I will try it!

2017-02-21 21:42 GMT+08:00 Stephan Ewen :

> Hi!
>
> Flink 1.1.4 and Flink 1.2 fixed a bunch of issues with HA, can you try
> those versions?
>
> If these also have issues, could you share the logs of the JobManager?
>
> Thanks!
>
> On Tue, Feb 21, 2017 at 11:41 AM, lining jing 
> wrote:
>
>> flink version: 1.1.3
>>
>> kill jobmanager, the job fail. Ha config did not work.
>>
>
>


Writing Tuple2 to a sink

2017-02-21 Thread Mohit Anchlia
What's the best way to retrieve both the values in Tuple2 inside a custom
sink given that the type is not known inside the sink function?


Re: Previously working job fails on Flink 1.2.0

2017-02-21 Thread Steffen Hausmann
Thanks Stefan and Stephan for your comments. I changed the type of the field 
and now the job seems to be running again.

And thanks Robert for filing the Jira!

Cheers,
Steffen


Am 21. Februar 2017 18:36:41 MEZ schrieb Robert Metzger :
>I've filed a JIRA for the problem:
>https://issues.apache.org/jira/browse/FLINK-5874
>
>On Tue, Feb 21, 2017 at 4:09 PM, Stephan Ewen  wrote:
>
>> @Steffen
>>
>> Yes, you can currently not use arrays as keys. There is a check
>missing
>> that gives you a proper error message for that.
>>
>> The double[] is hashed on the sender side before sending it. Java's
>hash
>> over an array does not take its contents into account, but the
>array's
>> memory address, which makes it a non-deterministic hash.
>> When the double is re-hashed on the receiver, you get a different
>hash,
>> which is detected as violating the key groups.
>>
>> In fact, your program was probably behaving wrong before, but now you
>get
>> a message for the error...
>>
>>
>>
>> On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> if you key is a double[], even if the field is a final double[], it
>is
>>> mutable because the array entries can be mutated and maybe that is
>what
>>> happened? You can check if the following two points are in sync,
>hash-wise:
>>> KeyGroupStreamPartitioner::selectChannels and
>>> AbstractKeyedStateBackend::setCurrentKey. The first method basically
>>> determines to which parallel operator a tuple is routed in a keyed
>stream.
>>> The second is determining the tuple’s key group for the backend.
>Both must
>>> be in sync w.r.t. their result of the key-group that is determined
>for the
>>> tuple. And this assignment is done based on the hash of the key.
>Therefore,
>>> the hash of the tuple’s key should never change and must be
>immutable. If
>>> you can notice a change in hash code, that change is what breaks
>your code.
>>> I am pretty sure that Flink 1.1.x might just silently accept a
>mutation of
>>> the key, but actually this is arguably incorrect.
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <
>>> stef...@hausmann-family.de>:
>>> >
>>> > Thanks for these pointers, Stefan.
>>> >
>>> > I've started a fresh job and didn't migrate any state from
>previous
>>> execution. Moreover, all the fields of all the events I'm using are
>>> declared final.
>>> >
>>> > I've set a breakpoint to figure out what event is causing the
>problem,
>>> and it turns out that Flink starts processing the incoming events
>for some
>>> time and only when a certain window triggers an exception is thrown.
>The
>>> specific code that causes the exception is as follows:
>>> >
>>> >> DataStream idleDuration = cleanedTrips
>>> >>.keyBy("license")
>>> >>.flatMap(new DetermineIdleDuration())
>>> >>.filter(duration -> duration.avg_idle_duration >= 0 &&
>>> duration.avg_idle_duration <= 240)
>>> >>.keyBy("location")
>>> >>.timeWindow(Time.minutes(10))
>>> >>.apply((Tuple tuple, TimeWindow window,
>Iterable
>>> input, Collector out) -> {
>>> >>double[] location = Iterables.get(input, 0).location;
>>> >>double avgDuration = StreamSupport
>>> >>.stream(input.spliterator(), false)
>>> >>.mapToDouble(idle -> idle.avg_idle_duration)
>>> >>.average()
>>> >>.getAsDouble();
>>> >>
>>> >>out.collect(new IdleDuration(location, avgDuration,
>>> window.maxTimestamp()));
>>> >>});
>>> >
>>> > If the apply statement is removed, there is no exception during
>runtime.
>>> >
>>> > The location field that is referenced by the keyBy statement is
>>> actually a double[]. May this cause the problems I'm experiencing?
>>> >
>>> > You can find some more code for additional context in the attached
>>> document.
>>> >
>>> > Thanks for looking into this!
>>> >
>>> > Steffen
>>> >
>>> >
>>> >
>>> > On 20/02/2017 15:22, Stefan Richter wrote:
>>> >> Hi,
>>> >>
>>> >> Flink 1.2 is partitioning all keys into key-groups, the atomic
>units
>>> for rescaling. This partitioning is done by hash partitioning and is
>also
>>> in sync with the routing of tuples to operator instances (each
>parallel
>>> instance of a keyed operator is responsible for some range of key
>groups).
>>> This exception means that Flink detected a tuple in the state
>backend of a
>>> parallel operator instance that should not be there because, by its
>key
>>> hash, it belongs to a different key-group. Or phrased differently,
>this
>>> tuple belongs to a different parallel operator instance. If this is
>a Flink
>>> bug or user code bug is very hard to tell, the log also does not
>provide
>>> additional insights. I could see this happen in case that your keys
>are
>>> mutable and your code makes some changes to the object that change
>the hash
>>> code. Another question is also: did 

Re: Checkpointing with RocksDB as statebackend

2017-02-21 Thread Ted Yu
Stephan:
The links were in the other email from vinay. 

> On Feb 21, 2017, at 10:46 AM, Stephan Ewen  wrote:
> 
> Hi!
> 
> I cannot find the screenshots you attached.
> The Apache Mailing lists sometimes don't support attachments, can you link to 
> the screenshots some way else?
> 
> Stephan
> 
> 
>> On Mon, Feb 20, 2017 at 8:36 PM, vinay patil  wrote:
>> Hi Stephan,
>> 
>> Just saw your mail while I was explaining the answer to your earlier 
>> questions. I have attached some more screenshots which are taken from the 
>> latest run today.
>> Yes I will try to set it to higher value and check if performance improves
>> 
>> Let me know your thoughts
>> 
>> Regards,
>> Vinay Patil
>> 
>>> On Tue, Feb 21, 2017 at 12:51 AM, Stephan Ewen [via Apache Flink User 
>>> Mailing List archive.] <[hidden email]> wrote:
>>> @Vinay!
>>> 
>>> Just saw the screenshot you attached to the first mail. The checkpoint that 
>>> failed came after one that had an incredible heavy alignment phase (14 GB).
>>> I think that working that off threw the next checkpoint because the workers 
>>> were still working off the alignment backlog.
>>> 
>>> I think you can for now fix this by setting the minimum pause between 
>>> checkpoints a bit higher (it is probably set a bit too small for the state 
>>> of your application).
>>> 
>>> Also, can you describe what your sources are (Kafka / Kinesis or file 
>>> system)?
>>> 
>>> BTW: We are currently working on
>>>   - incremental RocksDB checkpoints
>>>   - the network stack to allow in the future for a new way of doing the 
>>> alignment
>>> 
>>> Both of that should help that the program is more resilient to these 
>>> situations.
>>> 
>>> Best,
>>> Stephan
>>> 
>>> 
>>> 
 On Mon, Feb 20, 2017 at 7:51 PM, Stephan Ewen <[hidden email]> wrote:
 Hi Vinay!
 
 Can you start by giving us a bit of an environment spec?
 
   - What Flink version are you using?
   - What is your rough topology (what operations does the program use)
   - Where is the state (windows, keyBy)?
   - What is the rough size of your checkpoints and where does the time go? 
 Can you attach a screenshot from 
 https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/checkpoint_monitoring.html
   - What is the size of the JVM?
 
 Those things would be helpful to know...
 
 Best,
 Stephan
 
 
> On Mon, Feb 20, 2017 at 7:04 PM, vinay patil <[hidden email]> wrote:
> Hi Xiaogang,
> 
> Thank you for your inputs.
> 
> Yes I have already tried setting MaxBackgroundFlushes and 
> MaxBackgroundCompactions to higher value (tried with 2, 4, 8) , still not 
> getting expected results.
> 
> System.getProperty("java.io.tmpdir") points to /tmp but there I could not 
> find RocksDB logs, can you please let me know where can I find it ?
> 
> Regards,
> Vinay Patil
> 
>> On Mon, Feb 20, 2017 at 7:32 AM, xiaogang.sxg [via Apache Flink User 
>> Mailing List archive.] <[hidden email]> wrote:
>> Hi Vinay
>> 
>> Can you provide the LOG file in RocksDB? It helps a lot to figure out 
>> the problems becuse it records the options and the events happened 
>> during the execution. Otherwise configured, it should locate at the path 
>> set in System.getProperty("java.io.tmpdir"). 
>> 
>> Typically, a large amount of memory is consumed by RocksDB to store 
>> necessary indices. To avoid the unlimited growth in the memory 
>> consumption, you can put these indices into block cache (set 
>> CacheIndexAndFilterBlock to true) and properly set the block cache size.
>> 
>> You can also increase the number of backgroud threads to improve the 
>> performance of flushes and compactions (via MaxBackgroundFlushes and 
>> MaxBackgroudCompactions).
>> 
>> In YARN clusters, task managers will be killed if their memory 
>> utilization exceeds the allocation size. Currently Flink does not count 
>> the memory used by RocksDB in the allocation. We are working on 
>> fine-grained resource allocation (see FLINK-5131). It may help to avoid 
>> such problems.
>> 
>> May the information helps you.
>> 
>> Regards,
>> Xiaogang
>> 
>> 
>> --
>> 发件人:Vinay Patil <[hidden email]>
>> 发送时间:2017年2月17日(星期五) 21:19
>> 收件人:user <[hidden email]>
>> 主 题:Re: Checkpointing with RocksDB as statebackend
>> 
>> Hi Guys,
>> 
>> There seems to be some issue with RocksDB memory utilization.
>> 
>> Within few minutes of job run the physical memory usage increases by 4-5 
>> GB and it keeps on increasing.
>> I have tried different options for Max Buffer Size(30MB, 64MB, 128MB , 
>> 512MB) and Min Buffer to Merge as 2, but the physical memory keeps on 
>> increasing.
>> 

Re: Arrays values in keyBy

2017-02-21 Thread Robert Metzger
I've filed a JIRA for this issue:
https://issues.apache.org/jira/browse/FLINK-5874

On Wed, Jul 20, 2016 at 4:32 PM, Stephan Ewen  wrote:

> I thing we can simply add this behavior when we use the TypeComparator in
> the keyBy() function. It can implement the hashCode() as a deepHashCode on
> array types.
>
> On Mon, Jun 13, 2016 at 12:30 PM, Ufuk Celebi  wrote:
>
>> Would make sense to update the Javadocs for the next release.
>>
>> On Mon, Jun 13, 2016 at 11:19 AM, Aljoscha Krettek 
>> wrote:
>> > Yes, this is correct. Right now we're basically using .hashCode()
>> for
>> > keying. (Which can be problematic in some cases.)
>> >
>> > Beam, for example, clearly specifies that the encoded form of a value
>> should
>> > be used for all comparisons/hashing. This is more well defined but can
>> lead
>> > to slow performance in some cases.
>> >
>> > On Sat, 11 Jun 2016 at 00:04 Elias Levy 
>> wrote:
>> >>
>> >> I would be useful if the documentation warned what type of equality it
>> >> expected of values used as keys in keyBy.  I just got bit in the ass by
>> >> converting a field from a string to a byte array.  All of the sudden
>> the
>> >> windows were no longer aggregating.  So it seems Flink is not doing a
>> deep
>> >> compare of arrays when comparing keys.
>>
>
>


Re: Previously working job fails on Flink 1.2.0

2017-02-21 Thread Robert Metzger
I've filed a JIRA for the problem:
https://issues.apache.org/jira/browse/FLINK-5874

On Tue, Feb 21, 2017 at 4:09 PM, Stephan Ewen  wrote:

> @Steffen
>
> Yes, you can currently not use arrays as keys. There is a check missing
> that gives you a proper error message for that.
>
> The double[] is hashed on the sender side before sending it. Java's hash
> over an array does not take its contents into account, but the array's
> memory address, which makes it a non-deterministic hash.
> When the double is re-hashed on the receiver, you get a different hash,
> which is detected as violating the key groups.
>
> In fact, your program was probably behaving wrong before, but now you get
> a message for the error...
>
>
>
> On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> if you key is a double[], even if the field is a final double[], it is
>> mutable because the array entries can be mutated and maybe that is what
>> happened? You can check if the following two points are in sync, hash-wise:
>> KeyGroupStreamPartitioner::selectChannels and
>> AbstractKeyedStateBackend::setCurrentKey. The first method basically
>> determines to which parallel operator a tuple is routed in a keyed stream.
>> The second is determining the tuple’s key group for the backend. Both must
>> be in sync w.r.t. their result of the key-group that is determined for the
>> tuple. And this assignment is done based on the hash of the key. Therefore,
>> the hash of the tuple’s key should never change and must be immutable. If
>> you can notice a change in hash code, that change is what breaks your code.
>> I am pretty sure that Flink 1.1.x might just silently accept a mutation of
>> the key, but actually this is arguably incorrect.
>>
>> Best,
>> Stefan
>>
>> > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <
>> stef...@hausmann-family.de>:
>> >
>> > Thanks for these pointers, Stefan.
>> >
>> > I've started a fresh job and didn't migrate any state from previous
>> execution. Moreover, all the fields of all the events I'm using are
>> declared final.
>> >
>> > I've set a breakpoint to figure out what event is causing the problem,
>> and it turns out that Flink starts processing the incoming events for some
>> time and only when a certain window triggers an exception is thrown. The
>> specific code that causes the exception is as follows:
>> >
>> >> DataStream idleDuration = cleanedTrips
>> >>.keyBy("license")
>> >>.flatMap(new DetermineIdleDuration())
>> >>.filter(duration -> duration.avg_idle_duration >= 0 &&
>> duration.avg_idle_duration <= 240)
>> >>.keyBy("location")
>> >>.timeWindow(Time.minutes(10))
>> >>.apply((Tuple tuple, TimeWindow window, Iterable
>> input, Collector out) -> {
>> >>double[] location = Iterables.get(input, 0).location;
>> >>double avgDuration = StreamSupport
>> >>.stream(input.spliterator(), false)
>> >>.mapToDouble(idle -> idle.avg_idle_duration)
>> >>.average()
>> >>.getAsDouble();
>> >>
>> >>out.collect(new IdleDuration(location, avgDuration,
>> window.maxTimestamp()));
>> >>});
>> >
>> > If the apply statement is removed, there is no exception during runtime.
>> >
>> > The location field that is referenced by the keyBy statement is
>> actually a double[]. May this cause the problems I'm experiencing?
>> >
>> > You can find some more code for additional context in the attached
>> document.
>> >
>> > Thanks for looking into this!
>> >
>> > Steffen
>> >
>> >
>> >
>> > On 20/02/2017 15:22, Stefan Richter wrote:
>> >> Hi,
>> >>
>> >> Flink 1.2 is partitioning all keys into key-groups, the atomic units
>> for rescaling. This partitioning is done by hash partitioning and is also
>> in sync with the routing of tuples to operator instances (each parallel
>> instance of a keyed operator is responsible for some range of key groups).
>> This exception means that Flink detected a tuple in the state backend of a
>> parallel operator instance that should not be there because, by its key
>> hash, it belongs to a different key-group. Or phrased differently, this
>> tuple belongs to a different parallel operator instance. If this is a Flink
>> bug or user code bug is very hard to tell, the log also does not provide
>> additional insights. I could see this happen in case that your keys are
>> mutable and your code makes some changes to the object that change the hash
>> code. Another question is also: did you migrate your job from Flink 1.1.3
>> through an old savepoint or did you do a fresh start. Other than that, I
>> can recommend to check your code for mutating of keys. If this fails
>> deterministically, you could also try to set a breakpoint for the line of
>> the exception and take a look if the key that is about to be inserted is
>> somehow special.
>> >>
>> >> Best,
>> >> Stefan
>> >>
>> 

Re: How to achieve exactly once on node failure using Kafka

2017-02-21 Thread Y. Sakamoto

Thank you for your reply.

Under my understanding, Map / Filter Function operate with "at least once" when 
a failure occurs, and it is necessary to code that it will be saved (overwritten) in 
Elasticsearch with the same ID even if double data comes. Is it correct?
(sorry, I cannot understand how to "write changes to Flink's state to Elastic")

Regards,
Yuichiro


On 2017/02/21 3:56, Stephan Ewen wrote:

Hi!

Exactly-once end-to-end requires sinks that support that kind of behavior 
(typically some form of transactions support).

Kafka currently does not have the mechanisms in place to support exactly-once 
sinks, but the Kafka project is working on that feature.
For ElasticSearch, it is also not simply possible (because of missing transactions), but you can use Flink's state as the "authorative" state (it is exactly once) and then write changes to Flink's state to Elastic. That way the writes to ElasticSearch 
become "idempotent", which means duplicates simple make no additional changes.


Hope that helps!

Stephan




On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto > wrote:

Hi,
I'm using Flink 1.2.0 and try to do "exactly once" data transfer
from Kafka to Elasticsearch, but I cannot.
(Scala 2.11, Kafka 0.10, without YARN)

There are 2 Flink TaskManager nodes, and when processing
with 2 parallelism, shutdown one of them (simulating node failure).

Using flink-connector-kafka, I wrote following code:

   StreamExecutionEnvironment env = StreamExecutionEnvironment
 .getExecutionEnvironment();
   env.enableCheckpointing(1000L);
   env.setParallelism(2);

   Properties kafkaProp = new Properties();
   kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092 
");
   kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181 
");
   kafkaProp.setProperty("group.id ", "id");

   DataStream stream = env.addSource(new FlinkKafkaConsumer010<>(
 "topic", new SimpleStringSchema(), kafkaProp));

I found duplicated data transfer on map function.
Data from the checkpoint before node failure seems duplicated.

Is there any way to achieve "exactly once" on failure?


Thanks.
Yuichiro





--
☆ ─── ─ ─ - -
   Yuichiro SAKAMOTO
 ks...@muc.biglobe.ne.jp
 phonypian...@gmail.com
 http://phonypianist.sakura.ne.jp



Re: Flink checkpointing gets stuck

2017-02-21 Thread vinay patil
Hi Shai,

I checked online that Azure DS5_v2 has SSD for storage, why don't you try
to use FLASH_SSD_OPTIMIZED option

In my case as well the stream was getting stuck for few minutes, my
checkpoint duration is 6secs and minimumPauseIntervalBetweenCheckpoints is
5secs

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/large_state_tuning.html

I think if the writes to RocksDB are blocked then the stream can block for
certain interval
https://github.com/facebook/rocksdb/wiki/Write-Stalls

First try with FLASH_SSD_OPTIMIZED option, and don't give unnecessary high
heap memory to TM as rocksDB also uses physical memory




Regards,
Vinay Patil

On Tue, Feb 21, 2017 at 8:03 PM, Shai Kaplan [via Apache Flink User Mailing
List archive.]  wrote:

> Hi Vinay.
>
>
>
> I couldn't understand from the thread, what configuration solved your
> problem?
>
>
>
> I'm using the default predefined option. Perhaps it's not the best
> configuration for my setting (I'm using Azure DS5_v2 machines), I honestly
> haven't given much thought to that particular detail, but I think it should
> only affect the performance, not make the job totally stuck.
>
>
>
> Thanks.
>
>
>
> *From:* vinay patil [mailto:[hidden email]
> ]
> *Sent:* Tuesday, February 21, 2017 3:58 PM
> *To:* [hidden email] 
> *Subject:* Re: Flink checkpointing gets stuck
>
>
>
> Hi Shai,
>
> I was facing similar issue , however now the stream is not stuck in
> between.
>
> you can refer this thread for the configurations I have done :
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-
> Checkpointing-with-RocksDB-as-statebackend-td11752.html
> 
>
> What is the configuration on which you running the job ?
> What is the RocksDB predefined option you are using ?
>
>
> Regards,
>
> Vinay Patil
>
>
>
> On Tue, Feb 21, 2017 at 7:13 PM, Shai Kaplan [via Apache Flink User
> Mailing List archive.] <[hidden email]
> > wrote:
>
> Hi.
>
> I'm running a Flink 1.2 job with a 10 seconds checkpoint interval. After
> some running time (minutes-hours) Flink fails to save checkpoints, and
> stops processing records (I'm not sure if the checkpointing failure is the
> cause of the problem or just a symptom).
>
> After several checkpoints that take some seconds each, they start failing
> due to 30 minutes timeout.
>
> When I restart one of the Task Manager services (just to get the job
> restarted), the job is recovered from the last successful checkpoint (the
> state size continues to grow, so it's probably not the reason for the
> failure), advances somewhat, saves some more checkpoints, and then enters
> the failing state again.
>
> One of the times it happened, the first failed checkpoint failed due to
> "Checkpoint Coordinator is suspending.", so it might be an indicator for
> the cause of the problem, but looking into Flink's code I can't see how a
> running job could get to this state.
>
> I am using RocksDB for state, and the state is saved to Azure Blob Store,
> using the NativeAzureFileSystem HDFS connector over the wasbs protocol.
>
> Any ideas? Possibly a bug in Flink or RocksDB?
>
>
> --
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> checkpointing-gets-stuck-tp11776.html
> 
>
> To start a new topic under Apache Flink User Mailing List archive., email 
> [hidden
> email] 
> To unsubscribe from Apache Flink User Mailing List archive., click here.
> NAML
> 

Re: Flink checkpointing gets stuck

2017-02-21 Thread Ufuk Celebi
Hey Shai!

Thanks for reporting this.

It's hard to tell what causes this from your email, but could you
check the checkpoint interface
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/checkpoint_monitoring.html)
and report how much progress the checkpoints make before timing out?

The "Checkpoint Coordinator is suspending" message indicates that the
job failed and the checkpoint coordinator is shut down because of
that. Can you check the TaskManager and JobManager logs if other
errors are reported? Feel free to share them. Then I could help with
going over them.

– Ufuk


On Tue, Feb 21, 2017 at 2:47 PM, Shai Kaplan  wrote:
> Hi.
>
> I'm running a Flink 1.2 job with a 10 seconds checkpoint interval. After
> some running time (minutes-hours) Flink fails to save checkpoints, and stops
> processing records (I'm not sure if the checkpointing failure is the cause
> of the problem or just a symptom).
>
> After several checkpoints that take some seconds each, they start failing
> due to 30 minutes timeout.
>
> When I restart one of the Task Manager services (just to get the job
> restarted), the job is recovered from the last successful checkpoint (the
> state size continues to grow, so it's probably not the reason for the
> failure), advances somewhat, saves some more checkpoints, and then enters
> the failing state again.
>
> One of the times it happened, the first failed checkpoint failed due to
> "Checkpoint Coordinator is suspending.", so it might be an indicator for the
> cause of the problem, but looking into Flink's code I can't see how a
> running job could get to this state.
>
> I am using RocksDB for state, and the state is saved to Azure Blob Store,
> using the NativeAzureFileSystem HDFS connector over the wasbs protocol.
>
> Any ideas? Possibly a bug in Flink or RocksDB?


Re: Flink streaming. Broadcast reference data map across nodes

2017-02-21 Thread Ufuk Celebi
On Tue, Feb 21, 2017 at 2:35 PM, Vadim Vararu  wrote:
> Basically, i have a big dictionary of reference data that has to be
> accessible from all the nodes (in order to do some joins of log line with
> reference line).

If the dictionary is small you can make it part of the closures that
are send to the task managers. Just make it part of your function.

If it is large, I'm not sure what the best way is to do it is right
now. I've CC'd Aljoscha who can probably help...


Re: Task Manager recovery in Standalone Cluster High Availability mode

2017-02-21 Thread Ufuk Celebi
Hey! Did you configure a restart strategy?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/restart_strategies.html

Keep in mind that in In stand alone mode a TM process that has exited
won't be automatically restarted though.

On Tue, Feb 21, 2017 at 10:00 AM, F.Amara  wrote:
> Hi,
>
> I'm working with Apache Flink 1.1.2 and testing on High Availability mode.
> In the case of Task Manager failures they say a standby TM will recover the
> work of the failed TM. In my case, I have 4 TM's running in parallel and
> when a TM is killed the state goes to Cancelling and then to Failed rather
> than Restarting and the work is not recovered.
>
> Is there a specific way to create standby TM's and a specific reason for
> jobs not being recovered?
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-Manager-recovery-in-Standalone-Cluster-High-Availability-mode-tp11767.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


RE: Flink checkpointing gets stuck

2017-02-21 Thread Shai Kaplan
Hi Vinay.

I couldn't understand from the thread, what configuration solved your problem?

I'm using the default predefined option. Perhaps it's not the best 
configuration for my setting (I'm using Azure DS5_v2 machines), I honestly 
haven't given much thought to that particular detail, but I think it should 
only affect the performance, not make the job totally stuck.

Thanks.

From: vinay patil [mailto:vinay18.pa...@gmail.com]
Sent: Tuesday, February 21, 2017 3:58 PM
To: user@flink.apache.org
Subject: Re: Flink checkpointing gets stuck

Hi Shai,

I was facing similar issue , however now the stream is not stuck in between.
you can refer this thread for the configurations I have done : 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-td11752.html

What is the configuration on which you running the job ?
What is the RocksDB predefined option you are using ?


Regards,
Vinay Patil

On Tue, Feb 21, 2017 at 7:13 PM, Shai Kaplan [via Apache Flink User Mailing 
List archive.] <[hidden email]> 
wrote:
Hi.
I'm running a Flink 1.2 job with a 10 seconds checkpoint interval. After some 
running time (minutes-hours) Flink fails to save checkpoints, and stops 
processing records (I'm not sure if the checkpointing failure is the cause of 
the problem or just a symptom).
After several checkpoints that take some seconds each, they start failing due 
to 30 minutes timeout.
When I restart one of the Task Manager services (just to get the job 
restarted), the job is recovered from the last successful checkpoint (the state 
size continues to grow, so it's probably not the reason for the failure), 
advances somewhat, saves some more checkpoints, and then enters the failing 
state again.
One of the times it happened, the first failed checkpoint failed due to 
"Checkpoint Coordinator is suspending.", so it might be an indicator for the 
cause of the problem, but looking into Flink's code I can't see how a running 
job could get to this state.
I am using RocksDB for state, and the state is saved to Azure Blob Store, using 
the NativeAzureFileSystem HDFS connector over the wasbs protocol.
Any ideas? Possibly a bug in Flink or RocksDB?


If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-gets-stuck-tp11776.html
To start a new topic under Apache Flink User Mailing List archive., email 
[hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



View this message in context: Re: Flink checkpointing gets 
stuck
Sent from the Apache Flink User Mailing List archive. mailing list 
archive
 at Nabble.com.


Re: Previously working job fails on Flink 1.2.0

2017-02-21 Thread Stefan Richter
Hi,

if you key is a double[], even if the field is a final double[], it is mutable 
because the array entries can be mutated and maybe that is what happened? You 
can check if the following two points are in sync, hash-wise: 
KeyGroupStreamPartitioner::selectChannels and 
AbstractKeyedStateBackend::setCurrentKey. The first method basically determines 
to which parallel operator a tuple is routed in a keyed stream. The second is 
determining the tuple’s key group for the backend. Both must be in sync w.r.t. 
their result of the key-group that is determined for the tuple. And this 
assignment is done based on the hash of the key. Therefore, the hash of the 
tuple’s key should never change and must be immutable. If you can notice a 
change in hash code, that change is what breaks your code. I am pretty sure 
that Flink 1.1.x might just silently accept a mutation of the key, but actually 
this is arguably incorrect.

Best,
Stefan

> Am 21.02.2017 um 14:51 schrieb Steffen Hausmann :
> 
> Thanks for these pointers, Stefan.
> 
> I've started a fresh job and didn't migrate any state from previous 
> execution. Moreover, all the fields of all the events I'm using are declared 
> final.
> 
> I've set a breakpoint to figure out what event is causing the problem, and it 
> turns out that Flink starts processing the incoming events for some time and 
> only when a certain window triggers an exception is thrown. The specific code 
> that causes the exception is as follows:
> 
>> DataStream idleDuration = cleanedTrips
>>.keyBy("license")
>>.flatMap(new DetermineIdleDuration())
>>.filter(duration -> duration.avg_idle_duration >= 0 && 
>> duration.avg_idle_duration <= 240)
>>.keyBy("location")
>>.timeWindow(Time.minutes(10))
>>.apply((Tuple tuple, TimeWindow window, Iterable input, 
>> Collector out) -> {
>>double[] location = Iterables.get(input, 0).location;
>>double avgDuration = StreamSupport
>>.stream(input.spliterator(), false)
>>.mapToDouble(idle -> idle.avg_idle_duration)
>>.average()
>>.getAsDouble();
>> 
>>out.collect(new IdleDuration(location, avgDuration, 
>> window.maxTimestamp()));
>>});
> 
> If the apply statement is removed, there is no exception during runtime.
> 
> The location field that is referenced by the keyBy statement is actually a 
> double[]. May this cause the problems I'm experiencing?
> 
> You can find some more code for additional context in the attached document.
> 
> Thanks for looking into this!
> 
> Steffen
> 
> 
> 
> On 20/02/2017 15:22, Stefan Richter wrote:
>> Hi,
>> 
>> Flink 1.2 is partitioning all keys into key-groups, the atomic units for 
>> rescaling. This partitioning is done by hash partitioning and is also in 
>> sync with the routing of tuples to operator instances (each parallel 
>> instance of a keyed operator is responsible for some range of key groups). 
>> This exception means that Flink detected a tuple in the state backend of a 
>> parallel operator instance that should not be there because, by its key 
>> hash, it belongs to a different key-group. Or phrased differently, this 
>> tuple belongs to a different parallel operator instance. If this is a Flink 
>> bug or user code bug is very hard to tell, the log also does not provide 
>> additional insights. I could see this happen in case that your keys are 
>> mutable and your code makes some changes to the object that change the hash 
>> code. Another question is also: did you migrate your job from Flink 1.1.3 
>> through an old savepoint or did you do a fresh start. Other than that, I can 
>> recommend to check your code for mutating of keys. If this fails 
>> deterministically, you could also try to set a breakpoint for the line of 
>> the exception and take a look if the key that is about to be inserted is 
>> somehow special.
>> 
>> Best,
>> Stefan
>> 
>> 
>>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann 
>>> :
>>> 
>>> Hi there,
>>> 
>>> I’m having problems running a job on Flink 1.2.0 that successfully executes 
>>> on Flink 1.1.3. The job is supposed to read events from a Kinesis stream 
>>> and to send outputs to Elasticsearch and it actually initiates successfully 
>>> on a Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest 
>>> events into the Kinesis stream, the job fails (see the attachment for more 
>>> information):
>>> 
>>> java.lang.RuntimeException: Unexpected key group index. This indicates a 
>>> bug.
>>> 
>>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>>> 
>>> at 
>>> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>> 
>>> at 
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>>> 
>>> at 
>>> 

Re: Flink checkpointing gets stuck

2017-02-21 Thread vinay patil
Hi Shai,

I was facing similar issue , however now the stream is not stuck in between.

you can refer this thread for the configurations I have done :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-td11752.html

What is the configuration on which you running the job ?
What is the RocksDB predefined option you are using ?



Regards,
Vinay Patil

On Tue, Feb 21, 2017 at 7:13 PM, Shai Kaplan [via Apache Flink User Mailing
List archive.]  wrote:

> Hi.
>
> I'm running a Flink 1.2 job with a 10 seconds checkpoint interval. After
> some running time (minutes-hours) Flink fails to save checkpoints, and
> stops processing records (I'm not sure if the checkpointing failure is the
> cause of the problem or just a symptom).
>
> After several checkpoints that take some seconds each, they start failing
> due to 30 minutes timeout.
>
> When I restart one of the Task Manager services (just to get the job
> restarted), the job is recovered from the last successful checkpoint (the
> state size continues to grow, so it's probably not the reason for the
> failure), advances somewhat, saves some more checkpoints, and then enters
> the failing state again.
>
> One of the times it happened, the first failed checkpoint failed due to
> "Checkpoint Coordinator is suspending.", so it might be an indicator for
> the cause of the problem, but looking into Flink's code I can't see how a
> running job could get to this state.
>
> I am using RocksDB for state, and the state is saved to Azure Blob Store,
> using the NativeAzureFileSystem HDFS connector over the wasbs protocol.
>
> Any ideas? Possibly a bug in Flink or RocksDB?
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> checkpointing-gets-stuck-tp11776.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-gets-stuck-tp11776p11778.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Previously working job fails on Flink 1.2.0

2017-02-21 Thread Steffen Hausmann

Thanks for these pointers, Stefan.

I've started a fresh job and didn't migrate any state from previous 
execution. Moreover, all the fields of all the events I'm using are 
declared final.


I've set a breakpoint to figure out what event is causing the problem, 
and it turns out that Flink starts processing the incoming events for 
some time and only when a certain window triggers an exception is 
thrown. The specific code that causes the exception is as follows:



DataStream idleDuration = cleanedTrips
.keyBy("license")
.flatMap(new DetermineIdleDuration())
.filter(duration -> duration.avg_idle_duration >= 0 && 
duration.avg_idle_duration <= 240)
.keyBy("location")
.timeWindow(Time.minutes(10))
.apply((Tuple tuple, TimeWindow window, Iterable input, 
Collector out) -> {
double[] location = Iterables.get(input, 0).location;
double avgDuration = StreamSupport
.stream(input.spliterator(), false)
.mapToDouble(idle -> idle.avg_idle_duration)
.average()
.getAsDouble();

out.collect(new IdleDuration(location, avgDuration, 
window.maxTimestamp()));
});


If the apply statement is removed, there is no exception during runtime.

The location field that is referenced by the keyBy statement is actually 
a double[]. May this cause the problems I'm experiencing?


You can find some more code for additional context in the attached document.

Thanks for looking into this!

Steffen



On 20/02/2017 15:22, Stefan Richter wrote:

Hi,

Flink 1.2 is partitioning all keys into key-groups, the atomic units for 
rescaling. This partitioning is done by hash partitioning and is also in sync 
with the routing of tuples to operator instances (each parallel instance of a 
keyed operator is responsible for some range of key groups). This exception 
means that Flink detected a tuple in the state backend of a parallel operator 
instance that should not be there because, by its key hash, it belongs to a 
different key-group. Or phrased differently, this tuple belongs to a different 
parallel operator instance. If this is a Flink bug or user code bug is very 
hard to tell, the log also does not provide additional insights. I could see 
this happen in case that your keys are mutable and your code makes some changes 
to the object that change the hash code. Another question is also: did you 
migrate your job from Flink 1.1.3 through an old savepoint or did you do a 
fresh start. Other than that, I can recommend to check your code for mutating 
of keys. If this fails deterministically, you could also try to set a 
breakpoint for the line of the exception and take a look if the key that is 
about to be inserted is somehow special.

Best,
Stefan



Am 20.02.2017 um 14:32 schrieb Steffen Hausmann :

Hi there,

I’m having problems running a job on Flink 1.2.0 that successfully executes on 
Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to 
send outputs to Elasticsearch and it actually initiates successfully on a Flink 
1.2.0 cluster running on YARN, but as soon as I start to ingest events into the 
Kinesis stream, the job fails (see the attachment for more information):

java.lang.RuntimeException: Unexpected key group index. This indicates a bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)

at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)

at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)

Any ideas what’s going wrong here? The job executes successfully when it’s 
compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. 
Does this indicate a bug in my code or is this rather a bug in Flink? How can I 
further debug this?

Any guidance is highly appreciated.

Thanks,

Steffen




DataStream idleDuration = cleanedTrips
.keyBy("license")
.flatMap(new DetermineIdleDuration())
.filter(duration -> duration.avg_idle_duration >= 0 && 
duration.avg_idle_duration <= 240)
.keyBy("location")
.timeWindow(Time.minutes(10))
.apply((Tuple tuple, TimeWindow window, Iterable input, 
Collector out) -> {
double[] location = Iterables.get(input, 0).location;
double avgDuration = StreamSupport
.stream(input.spliterator(), false)
.mapToDouble(idle -> idle.avg_idle_duration)

Flink checkpointing gets stuck

2017-02-21 Thread Shai Kaplan
Hi.
I'm running a Flink 1.2 job with a 10 seconds checkpoint interval. After some 
running time (minutes-hours) Flink fails to save checkpoints, and stops 
processing records (I'm not sure if the checkpointing failure is the cause of 
the problem or just a symptom).
After several checkpoints that take some seconds each, they start failing due 
to 30 minutes timeout.
When I restart one of the Task Manager services (just to get the job 
restarted), the job is recovered from the last successful checkpoint (the state 
size continues to grow, so it's probably not the reason for the failure), 
advances somewhat, saves some more checkpoints, and then enters the failing 
state again.
One of the times it happened, the first failed checkpoint failed due to 
"Checkpoint Coordinator is suspending.", so it might be an indicator for the 
cause of the problem, but looking into Flink's code I can't see how a running 
job could get to this state.
I am using RocksDB for state, and the state is saved to Azure Blob Store, using 
the NativeAzureFileSystem HDFS connector over the wasbs protocol.
Any ideas? Possibly a bug in Flink or RocksDB?


Re: flink on yarn ha

2017-02-21 Thread Stephan Ewen
Hi!

Flink 1.1.4 and Flink 1.2 fixed a bunch of issues with HA, can you try
those versions?

If these also have issues, could you share the logs of the JobManager?

Thanks!

On Tue, Feb 21, 2017 at 11:41 AM, lining jing  wrote:

> flink version: 1.1.3
>
> kill jobmanager, the job fail. Ha config did not work.
>


Flink streaming. Broadcast reference data map across nodes

2017-02-21 Thread Vadim Vararu

Hi all,


I would like to do something similar to Spark's broadcast mechanism.

Basically, i have a big dictionary of reference data that has to be 
accessible from all the nodes (in order to do some joins of log line 
with reference line).


I did not find yet a way to do it.


Any ideas?



Re: Cannot run using a savepoint with the same jar

2017-02-21 Thread Aljoscha Krettek
Hi Rami,
could you maybe provide your code? You could also send it to me directly if
you don't want to share with the community.

It might be that there is something in the way the pipeline is setup that
causes the (generated) operator UIDs to not be deterministic.

Best,
Aljoscha

On Sat, 7 Jan 2017 at 12:36 Rami Al-Isawi  wrote:

> Hi Stephan,
>
> I have not change the parallelism nor the names or anything in my program.
> It is the same exact jar file unmodified.
>
> I have tried uid. but I faced this "UnsupportedOperationException: Cannot
> assign user-specified hash to intermediate node in chain. This will be
> supported in future versions of Flink. As a work around start new chain at
> task Map."
>
> Any clues how to carry on? I am just trying to avoid the painful process
> of dismantling the code and test so I come closer to the cause.
>
> I just think that if I am providing the same exact jar, nothing should
> break.
>
> Regards,
> -Rami
>
> On 4 Jan 2017, at 11:19, Stephan Ewen  wrote:
>
> Hi!
>
> Did you change the parallelism in your program, or do the names of some
> functions change each time you call the program?
>
> Can you try what happens when you give explicit IDs to operators via the
> '.uid(...)' method?
>
> Stephan
>
>
> On Tue, Jan 3, 2017 at 11:44 PM, Al-Isawi Rami 
> wrote:
>
> Hi,
>
> I have a flink job that I can trigger a save point for with no problem.
> However, If I cancel the job then try to run it with the save point, I get
> the following exception. Any ideas how can I debug or fix it? I am using
> the exact same jar so I did not modify the program in any manner. Using
> Flink version 1.1.4
>
>
> Caused by: java.lang.IllegalStateException: Failed to rollback to
> savepoint jobmanager://savepoints/1. Cannot map savepoint state for
> operator 1692abfa98b4a67c1b7dfc17f79d35d7 to the new program, because the
> operator is not available in the new program. If you want to allow this,
> you can set the --allowNonRestoredState option on the CLI.
> at
> org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:257)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:1020)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1336)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1326)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1326)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Disclaimer: This message and any attachments thereto are intended solely
> for the addressed recipient(s) and may contain confidential information. If
> you are not the intended recipient, please notify the sender by reply
> e-mail and delete the e-mail (including any attachments thereto) without
> producing, distributing or retaining any copies thereof. Any review,
> dissemination or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended
> recipient(s) is prohibited. Thank you.
>
>
>
> Disclaimer: This message and any attachments thereto are intended solely
> for the addressed recipient(s) and may contain confidential information. If
> you are not the intended recipient, please notify the sender by reply
> e-mail and delete the e-mail (including any attachments thereto) without
> producing, distributing or retaining any copies thereof. Any review,
> dissemination or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended
> recipient(s) is prohibited. Thank you.
>


Re: state size in relation to cluster size and processing speed

2017-02-21 Thread Aljoscha Krettek
Hi Seth,
sorry for taking so long to get back to you on this. I think the watermark
thing might have been misleading by me, I don't even know anymore what I
was thinking back then.

Were you able to confirm that the results were in fact correct for the runs
with the different parallelism? I know the results are not the same because
you process different amounts of data, but still the correctness of the
result can be confirmed.

Best,
Aljoscha

On Fri, 16 Dec 2016 at 21:01 Seth Wiesman  wrote:

Hi,



I’ve noticed something peculiar about the relationship between state size
and cluster size and was wondering if anyone here knows of the reason. I am
running a job with 1 hour tumbling event time windows which have an allowed
lateness of 7 days. When I run on a 20-node cluster with FsState I can
process approximately 1.5 days’ worth of data in an hour with the most
recent checkpoint being ~20gb.  Now if I run the same job with the same
configurations on a 40-node cluster I can process 2 days’ worth of data in
20 min (expected) but the state size is only ~8gb. Because allowed lateness
is 7 days no windows should be purged yet and I would expect the larger
cluster which has processed more data to have a larger state. Is there some
why a slower running job or a smaller cluster would require more state?



This is more of a curiosity than an issue. Thanks’ in advance for any
insights you may have.



Seth Wiesman


Re: Splitting a stream based on validation

2017-02-21 Thread Aljoscha Krettek
I think you basically need something like this:

DataStream input = ...
DataStream withErrors = input.filter(new MyErrorFilter());
DataStream withoutErrors = input.filter(new MyWithoutErrorFilter());

withErrors.addSink(...)
withoutErrors.addSink(...)

Does that help?

On Mon, 20 Feb 2017 at 13:44 Chet Masterson 
wrote:

>
> A while back on the mailing list, there was a discussion on validating a
> stream, and splitting the stream into two sinks, depending on how the
> validation went:
>
> (operator generating errors)
> --> (filter) --> stream without errors --> sink
> --> (filter) --> error stream  --> sink
>
> Is there an example of this implemented in (scala) code anywhere? I'm not
> sure how to code this up. Do I embed the error sink in the filter? The
> compiler hated everything I tried.
>


flink on yarn ha

2017-02-21 Thread lining jing
flink version: 1.1.3

kill jobmanager, the job fail. Ha config did not work.


Re: Apache Flink and Elasticsearch send Json Object instead of string

2017-02-21 Thread Fábio Dias
Hi,
thanks for the reply.

There isn't other way to do that?
Using REST you can send json like this :

curl -XPOST 'localhost:9200/customer/external?pretty' -H
'Content-Type: application/json' -d'
{
 "name": "Jane Doe"
}
'

In my case I have json like this:

{
  "filters" : {
"id" : 1,
"name": "abc"
}
}

how can I treat this cases? There isn't a way to send all the json element
and index it like the in the REST request?

Thanks.

Tzu-Li (Gordon) Tai  escreveu no dia terça, 21/02/2017
às 07:54:

> Hi,
>
> I’ll use your code to explain.
>
> public IndexRequest createIndexRequest(String element){
>
> HashMap esJson = new HashMap<>();
>
> esJson.put("data", element);
>
> What you should do here is parse the field values from `element`, and
> simply treat them as key-value pairs of the `esJson` map.
>
> So, the `esJson` should be prepared by doing:
>
> esJson.put(“id”, 6);
>
> esJson.put(“name”, “A green door”);
>
> esJson.put(“price”, 12.5);
>
> etc.
>
>
> Cheers,
>
> Gordon
>
>
> On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodio...@gmail.com)
> wrote:
>
> Hi,
>
> I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a
> json object ({"id":1, "name":"X"} ect...), I already have a string with
> this information, but I don't want to save it as string.
>
> I recieve this:
>
> {
>   "_index": "logs",
>   "_type": "object",
>   "_id": "AVpcARfkfYWqSubr0ZvK",
>   "_score": 1,
>   "_source": {
> "data": "{\"id\":6,\"name\":\"A green
> door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
>   }
> }
>
> And I want to recieve this:
>
> {
> "_index": "logs",
> "_type": "external",
> "_id": "AVpcARfkfYWqSubr0ZvK",
> "_score": 1,
> "_source": {
> "data": {
> "id":6,
> "name":"A green door",
> "price":12.5,
> "tags":
> ["home","green"]
> }
> }
> }
>
> my java code:
>
> try {
> ArrayList transports = new ArrayList<>();
> transports.add(new InetSocketAddress("127.0.0.1", 9300));
>
> ElasticsearchSinkFunction indexLog = new
> ElasticsearchSinkFunction() {
>
> private static final long serialVersionUID = 8802869701292023100L;
>
> public IndexRequest createIndexRequest(String element){
>
> HashMap esJson = new HashMap<>();
>
> esJson.put("data", element);
>
>
>
> return Requests
> .indexRequest()
> .index("logs")
> .type("object")
> .source(esJson);
> }
> @Override
> public void process(String element, RuntimeContext ctx,
> RequestIndexer indexer) {
> indexer.add(createIndexRequest(element));
> }
> };
>
> ElasticsearchSink esSink = new
> ElasticsearchSink(config, transports, indexLog);
> input.addSink(esSink);
> }
> catch (Exception e) {
> System.out.println(e);
> }
>
>
> Do I need to treat every entry as a map? Can I just send a object with key
> value?
>
> Thanks.
>
>


Task Manager recovery in Standalone Cluster High Availability mode

2017-02-21 Thread F.Amara
Hi,

I'm working with Apache Flink 1.1.2 and testing on High Availability mode.
In the case of Task Manager failures they say a standby TM will recover the
work of the failed TM. In my case, I have 4 TM's running in parallel and
when a TM is killed the state goes to Cancelling and then to Failed rather
than Restarting and the work is not recovered. 

Is there a specific way to create standby TM's and a specific reason for
jobs not being recovered? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-Manager-recovery-in-Standalone-Cluster-High-Availability-mode-tp11767.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: 回复:回复:Transfer information from one window to the next

2017-02-21 Thread Sonex
Hi and thank you for your response,

is it possible to give me a simple example? How can I put the variable into
a state and then access the state to the next apply function?

I am new to flink.

Thank you.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11766.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.