Re: Exception from in-progress implementation of Python API bulk iterations

2016-10-17 Thread Geoffrey Mon
Your solution works well, many thanks. This solves the exception that I
described previously.

However, in a different part of the script I come across another problem
about reusing data sets. For example, given the script at
https://gist.github.com/GEOFBOT/d670f567f8c886572c8715a6058f8b34, I get an
exception about a data type not having a corresponding deserializer for a
broadcasted variable. However, no data sets in my plan are broadcasted. I
noticed that in the operations diagram generated by Flink, the data set 'S'
is created two times, once each time it is used in the plan, even though I
intended to reuse the data set. One of the creations involves a broadcast.
Presumably this is related to an optimization built into Flink?

I appreciate any insights or help you may have about this problem.

Thanks,
Geoffrey

On Fri, Oct 14, 2016 at 7:24 AM Chesnay Schepler  wrote:

In this branch: https://github.com/zentol/flink/tree/new-iterations you
can find a more fine-grained fix for chaining with
iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695

On 13.10.2016 23:11, Chesnay Schepler wrote:
> The chaining code is definitely related, I also have a pretty clear
> idea how to fix it.
>
> The odd thing is that the Java API doesn't catch this type mismatch;
> the date types are
> known when the plan is generated. This kind of error shouldn't even
> happen.
>
> On 13.10.2016 21:15, Geoffrey Mon wrote:
>> Thank you very much. Disabling chaining with the Python API allows my
>> actual script to run properly. The division by zero must be an issue
>> with
>> the job that I posted on gist.
>>
>> Does that mean that the issue must be in the chaining part of the API?
>> Chaining from the way I understand it is an important optimization that
>> would be important for the performance comparison I wish to make in my
>> project.
>>
>> Cheers,
>> Geoffrey
>>
>> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler 
>> wrote:
>>
>>> A temporary work around appears to be disabling chaining, which you can
>>> do by commenting out L215 "self._find_chains()" in Environment.py.
>>> Note that you then run into a division by zero error, but i can't tell
>>> whether that is a problem of the job or not.
>>>
>>> On 13.10.2016 13:41, Chesnay Schepler wrote:
 Hey Geoffrey,

 I was able to reproduce the error and will look into it in more detail
 tomorrow.

 Regards,
 Chesnay

 On 12.10.2016 23:09, Geoffrey Mon wrote:
> Hello,
>
> Has anyone had a chance to look into this? I am currently working
> on the
> problem but I have minimal understanding of how the internal Flink
> Python
> API works; any expertise would be greatly appreciated.
>
> Thank you very much!
>
> Geoffrey
>
> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon 
> wrote:
>
>> Hi Chesnay,
>>
>> Heh, I have discovered that if I do not restart Flink after
>> running my
>> original problematic script, then similar issues will manifest
>> themselves
>> in other otherwise working scripts. I haven't been able to
>> completely
>> narrow down the problem, but I promise this new script will have a
>> ClassCastException that is completely reproducible. :)
>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>
>> Thanks,
>> Geoffrey
>>
>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler
>> 
>> wrote:
>>
>> Hello Geoffrey,
>>
>> this one works for me as well :D
>>
>> Regards,
>> Chesnay
>>
>> On 28.09.2016 05:38, Geoffrey Mon wrote:
>>> Hello Chesnay,
>>>
>>> Thank you for your help. After receiving your message I
>>> recompiled my
>>> version of Flink completely, and both the NullPointerException
>>> listed in
>>> the TODO and the ClassCastException with the join operation went
>>> away.
>>> Previously, I had been only recompiling the modules of Flink
>>> that had
>> been
>>> changed to save time using "mvn clean install -pl :module" and
>>> apparently
>>> that may have been causing some of my issues.
>>>
>>> Now, the problem is more clear: when a specific group reduce
>>> function in
>> my
>>> research project plan file is used within an iteration, I get a
>>> ClassCastException exception:
>>> Caused by: java.lang.ClassCastException:
>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
>>> at
>>>
>>>
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
>>>
>>> at
>>> org.apache.flink.runtime.iterative.io
>>>
.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>>
>>> at
>>>
>>>
org.apache.flink.python.api.streaming.data.P

[jira] [Created] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-10-17 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4848:
-

 Summary: keystoreFilePath should be checked against null in 
SSLUtils#createSSLServerContext
 Key: FLINK-4848
 URL: https://issues.apache.org/jira/browse/FLINK-4848
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  String keystoreFilePath = sslConfig.getString(
ConfigConstants.SECURITY_SSL_KEYSTORE,
null);
...
  try {
keyStoreFile = new FileInputStream(new File(keystoreFilePath));
{code}
If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4849) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-10-17 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4849:
-

 Summary: keystoreFilePath should be checked against null in 
SSLUtils#createSSLServerContext
 Key: FLINK-4849
 URL: https://issues.apache.org/jira/browse/FLINK-4849
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  String keystoreFilePath = sslConfig.getString(
ConfigConstants.SECURITY_SSL_KEYSTORE,
null);
...
  try {
keyStoreFile = new FileInputStream(new File(keystoreFilePath));
{code}
If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

2016-10-17 Thread Vishnu Viswanath
Hi Aljoscha,

Thanks for the response.

I did think about creating a new class similar to TimestampedValue as you
suggested, but that class looked almost same as the current StreamRecord
class. (Both have a timestamp field and a value field).

Do you think it is fine to have another class for holding (timestamp,value)
tuple?

Regards,
Vishnu

On Mon, Oct 17, 2016 at 4:19 AM, Aljoscha Krettek 
wrote:

> Hi Vishnu,
> what you suggested is spot on! Please go forward with it like this.
>
> One small suggestion would be to change Tuple2 to something like
> TimestampedValue to not rely on tuples because they can be confusing for
> people who write Scala code because they are not Scala tuples. That's not
> strictly necessary, though, you can spin it however you like.
>
> Cheers,
> Aljoscha
>
> On Fri, 7 Oct 2016 at 18:46 Vishnu Viswanath  >
> wrote:
>
> > Hi Radu,
> >
> > Yes we can remove elements randomly using iterator.remove()
> >
> > Regards,
> > Vishnu
> >
> > On Fri, Oct 7, 2016 at 2:57 AM, Radu Tudoran 
> > wrote:
> >
> > > Hi,
> > >
> > > I must apologies that I missed some of the email exchanges on this
> thread
> > > and thus my remark/question might have been already settled.
> > >
> > > Does this interface you propose enable to remove also elements out of
> > > order e.g., assuming I have elements 1,2,3,4,5 in the window buffer to
> be
> > > able to evict 2 and 4?
> > > We discussed about this some email exchanges ago but as I said I am not
> > > sure if this functionality is captured in this interface. Basically,
> will
> > > the typical remove() method from Iterators be available?
> > >
> > > Best regards,
> > >
> > >
> > > -Original Message-
> > > From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com]
> > > Sent: Friday, October 07, 2016 8:29 AM
> > > To: Dev
> > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
> > >
> > > Hi Aljoscha,
> > >
> > > To pass the time information to Evictor at the same to not expose the
> > > StreamRecord, I suppose we can change the signature of evictBefore and
> > > evictAfter to take Iterable> instead
> > > Iterable>
> > >
> > > void evictBefore(Iterable> elements, int size, W
> window,
> > > EvictorContext evictorContext);
> > >
> > > The fire() method of EvictingWindowOperator can transform the
> > > Iterable> to FluentIterable> and pass
> > it
> > > on to the evictor(where f0 will be the timestamp and f1 will the
> value).
> > > That way the TimeEvictor will work for EventTime or IngestionTime as
> long
> > > as timestamp is set in the StreamRecord. In case timestamp is not set,
> > > TimeEvictor can capture this by checking the Tuple2.f0 (which will be
> > > Long.MIN_VALUE) and ignore the eviction.
> > >
> > > If you think this is fine, I will make the changes and also edit the
> > FLIP.
> > >
> > > Regards,
> > > Vishnu
> > >
> > >
> > > On Wed, Oct 5, 2016 at 9:49 PM, Vishnu Viswanath <
> > > vishnu.viswanat...@gmail.com> wrote:
> > >
> > > > Thank you Aljoscha,
> > > >
> > > > Yes, I agree we don't need ProcessingTimeEvcitor.
> > > > I will change the current TimeEvictors to use EventTimeEvictor as
> > > > suggested.
> > > >
> > > > Also, figure out a way to pass timestamp to Evictor interface so that
> > we
> > > > can avoid exposing StreamRecrods.
> > > >
> > > > Regards,
> > > > Vishnu
> > > >
> > > >
> > > >
> > > > On Tue, Sep 20, 2016 at 4:33 AM, Aljoscha Krettek <
> aljos...@apache.org
> > >
> > > > wrote:
> > > >
> > > >> Hi,
> > > >> now you again see what I mentioned a while back: eviction based on
> > > >> processing time is not really well defined. I think we can
> completely
> > > get
> > > >> rid of "processing time eviction" because it can be replaced by
> > > something
> > > >> like this:
> > > >>
> > > >> DataStream input = ...
> > > >> DataStream withTimestamps = input.assignTimestampsAndWatermarks(new
> > > >> IngestionTimeExtractor()) // this will assign the current processing
> > > time
> > > >> as timestamp
> > > >> withTimestamps
> > > >>   .keyBy(...)
> > > >>   .window(...)
> > > >>   .evictor(new EventTimeEvictor())
> > > >>   .apply(...)
> > > >>
> > > >> With this, we would just have to find a good way of passing the
> > > timestamps
> > > >> in the Evictor interface and a good way of implementing the
> > > >> EvictingWindowOperator.
> > > >>
> > > >> Cheers,
> > > >> Aljoscha
> > > >>
> > > >>
> > > >> On Sun, 18 Sep 2016 at 18:14 Vishnu Viswanath <
> > > >> vishnu.viswanat...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi Aljoscha,
> > > >> >
> > > >> > A)
> > > >> > I tried the approach where we set the ProcessingTime explicitly by
> > > >> > converting DataStream input  to DataStream>
> using
> > > map
> > > >> > function and below are my observations:
> > > >> > 1. All the current code which uses TimeEvictor (which will be by
> > > default
> > > >> > changed to ProcessingTimeEvictor) will be forced to implement a
> > > mapping
> > > >> > Function to agree with the new method signature.
> > > >

Re: Flink Metrics

2016-10-17 Thread amir bahmanyari
Hi colleagues,Is there a link that described Flink Matrices & provides example 
on how to utilize it pls?I really appreciate it...Cheers

  From: Till Rohrmann 
 To: u...@flink.apache.org 
Cc: dev@flink.apache.org
 Sent: Monday, October 17, 2016 12:52 AM
 Subject: Re: Flink Metrics
   
Hi Govind,

I think the DropwizardMeterWrapper implementation is just a reference
implementation where it was decided to report the minute rate. You can
define your own meter class which allows to configure the rate interval
accordingly.

Concerning Timers, I think nobody requested this metric so far. If you
want, then you can open a JIRA issue and contribute it. The community would
really appreciate that.

Cheers,
Till
​

On Mon, Oct 17, 2016 at 5:26 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi,
>
> I am currently using flink 1.2 snapshot and instrumenting my pipeline with
> flink metrics. One small suggestion I have is currently the Meter interface
> only supports getRate() which is always the one minute rate.
>
> It would great if all the rates (1 min, 5 min & 15 min) are exposed to get
> a better picture in terms of performance.
>
> Also is there any reason why timers are not part of flink metrics core?
>
> Regards,
> Govind
>

   

Re: [DISCUSS] Schedule and Scope for Flink 1.2

2016-10-17 Thread Stephan Ewen
I think this sounds very reasonable, +1 to the schedule.

I would definitely add
  - FLIP-10 (unify checkpoints and savepoints)
  - FLIP-7 (metrics in web UI)
  - FLIP-12 (async request operators)

Those should be all safe bets, as they are basically done.

Let's see what else is in shape until the feature freeze day.



On Mon, Oct 17, 2016 at 2:18 PM, Fabian Hueske  wrote:

> Hi everybody,
>
> Flink 1.1.0 was released in August and I think it is time to kick off a
> discussion about the schedule and scope of Flink 1.2.0.
>
> == Scope
>
> We started to collect features for Flink 1.2.0 in the Flink Release wiki
> page [1].
> I copy the feature list for convenience:
>
>- Dynamic Scaling / Key Groups (FLINK-3755
>) (Design Doc
> LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4>
>)
>- Rescalable Non-Partitioned State (FLIP-8
> 8%3A+Rescalable+Non-Partitioned+State>
>)
>- Backwards compatible savepoints (FLINK-4797
>)
>-
>
>Integrate Flink with Apache Mesos (FLINK-1984
>) (Design Doc
> xf5S4hjEuPchuU/edit#heading=h.33uxp8abvuf0>
>)
>- Secure Data Access (FLINK-3930
>) (Design Doc
> nVzBoJ8oPaAs/edit#heading=h.dr22iualfm0h>
>)
>- Queryable State (FLINK-3779
>)
>- Metrics in Webinterface (FLIP-7
> 7%3A+Expose+metrics+to+WebInterface>
>)
>- Kafka 0.10 support (FLINK-4035
>)
>- Table API: Group Window Aggregates (FLINK-4691, FLIP-11)
>- Table API: Scalar Functions (FLINK-3097)
>
>
> Is a feature missing or something too ambitious?
>
> == Schedule
>
> In my opinion we should try to release 1.2.0 before Christmas. That means,
> we should have a RC1 by end November / early December and a feature freeze
> two weeks earlier in mid of November. Does that sound appropriate or should
> we aim for an earlier or later release?
>
> What do others think about the scope and schedule?
> Please discuss.
>
> Best,
> Fabian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/
> Flink+Release+and+Feature+Plan
>


[jira] [Created] (FLINK-4847) Let RpcEndpoint.start/shutDown throw exceptions

2016-10-17 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4847:


 Summary: Let RpcEndpoint.start/shutDown throw exceptions
 Key: FLINK-4847
 URL: https://issues.apache.org/jira/browse/FLINK-4847
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Reporter: Till Rohrmann


The {{RpcEndpoint.start}} and {{RpcEndpoint.shutDown}} methods should be 
allowed to throw exceptions if things go wrong. Otherwise, exceptions will be 
given to a callback which handles them later, even though we know that we can 
fail the components right away (as it is the case for the {{TaskExectuor}}, for 
example).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4846) FlinkML - Pass "env" has an implicit parameter in MLUtils.readLibSVM

2016-10-17 Thread Thomas FOURNIER (JIRA)
Thomas FOURNIER created FLINK-4846:
--

 Summary: FlinkML - Pass "env" has an implicit parameter in 
MLUtils.readLibSVM
 Key: FLINK-4846
 URL: https://issues.apache.org/jira/browse/FLINK-4846
 Project: Flink
  Issue Type: Improvement
Reporter: Thomas FOURNIER
Priority: Minor


With Flink ML you can import file via MLUtils.readLibSVM (import 
org.apache.flink.ml.MLUtils)

For example:

val env = ExecutionEnvironment.getExecutionEnvironment
val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM(env, 
"src/main/resources/svmguide1")

I'd like to pass "env" as an implicit parameter and use the method as such:

val astroTrain: DataSet[LabeledVector] = 
MLUtils.readLibSVM("src/main/resources/svmguide1")

Is it ok (not a scala specialist yet :) ) ?




 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [FLINK-3035] Redis as State Backend

2016-10-17 Thread Aljoscha Krettek
Hi,
there are two basic ideas for implementing a StateBackend based on Redis:
 1. Rely on Redis to keep the state, use nothing else.
 2. Use Redis to keep the state and checkpoint to some distributed file
system (such as HDFS) when checkpointing

The first idea seems unwise because Redis is not a "strongly consistent
distributed data store" as Elias pointed out on the issue. The second Idea
is problematic because there is no easy way to read all state for a given
Flink operator from a running Redis instance to store it in HDFS. That's
what I was getting at in my comment.

Cheers,
Aljoscha

On Fri, 7 Oct 2016 at 17:19 Ovidiu Cristian Marcu <
ovidiu.cristian.ma...@huawei.com> wrote:

> Hi
>
> Can you please expand the last comment:
>
> "I think, however, that for other reasons we will probably not be able to
> implement this well. The problem is that we have to somehow get at the
> state in redis for checkpointing. And if we use only one Redis instance for
> all states then this will be problematic." - Aljoscha Krettek
>
> Any other update on this issue will help, not clear the status.
>
> Best,
> Ovidiu
>
>


[jira] [Created] (FLINK-4845) Fix Job Exceptions page

2016-10-17 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4845:
---

 Summary: Fix Job Exceptions page
 Key: FLINK-4845
 URL: https://issues.apache.org/jira/browse/FLINK-4845
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.2.0
Reporter: Chesnay Schepler
 Fix For: 1.2.0


The job exceptions page in the WebFrontend is currently broken, displaying 
(null) for every task even if no exception has occurred.

The JobExceptionsHandlers checks whether an exception has occurred, and thus 
displayed, by checking whether the rootException is null. This worked until 
recently since we worked directly on Throwables, this was jhowever changed to 
stringified version of the exceptions. This had the side-effect that the 
exception is now never null, but "(null)", leading the handler to believe that 
an exception has occurred.

The checks should be changed to {code}!exception.equals("(null)"){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] Schedule and Scope for Flink 1.2

2016-10-17 Thread Fabian Hueske
Hi everybody,

Flink 1.1.0 was released in August and I think it is time to kick off a
discussion about the schedule and scope of Flink 1.2.0.

== Scope

We started to collect features for Flink 1.2.0 in the Flink Release wiki
page [1].
I copy the feature list for convenience:

   - Dynamic Scaling / Key Groups (FLINK-3755
   ) (Design Doc
   

   )
   - Rescalable Non-Partitioned State (FLIP-8
   

   )
   - Backwards compatible savepoints (FLINK-4797
   )
   -

   Integrate Flink with Apache Mesos (FLINK-1984
   ) (Design Doc
   

   )
   - Secure Data Access (FLINK-3930
   ) (Design Doc
   

   )
   - Queryable State (FLINK-3779
   )
   - Metrics in Webinterface (FLIP-7
   

   )
   - Kafka 0.10 support (FLINK-4035
   )
   - Table API: Group Window Aggregates (FLINK-4691, FLIP-11)
   - Table API: Scalar Functions (FLINK-3097)


Is a feature missing or something too ambitious?

== Schedule

In my opinion we should try to release 1.2.0 before Christmas. That means,
we should have a RC1 by end November / early December and a feature freeze
two weeks earlier in mid of November. Does that sound appropriate or should
we aim for an earlier or later release?

What do others think about the scope and schedule?
Please discuss.

Best,
Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Release+and+Feature+Plan


[jira] [Created] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4844:
-

 Summary: Partitionable Raw Keyed/Operator State
 Key: FLINK-4844
 URL: https://issues.apache.org/jira/browse/FLINK-4844
 Project: Flink
  Issue Type: New Feature
Reporter: Stefan Richter
Assignee: Stefan Richter


Partitionable operator and keyed state are currently only available by using 
backends. However, the serialization code for many operators is build around 
reading/writing their state to a stream for checkpointing. We want to provide 
partitionable states also through streams, so that migrating existing operators 
becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4843) Introduce Test for FsCheckpointStateOutputStream::getPos

2016-10-17 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4843:
-

 Summary: Introduce Test for FsCheckpointStateOutputStream::getPos
 Key: FLINK-4843
 URL: https://issues.apache.org/jira/browse/FLINK-4843
 Project: Flink
  Issue Type: Test
Reporter: Stefan Richter
Assignee: Stefan Richter


Introduce a test for FsCheckpointStateOutputStream::getPos, which is currently 
not included in the tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles

2016-10-17 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4842:
-

 Summary: Introduce test to enforce order of operator / udf 
lifecycles 
 Key: FLINK-4842
 URL: https://issues.apache.org/jira/browse/FLINK-4842
 Project: Flink
  Issue Type: Test
Reporter: Stefan Richter
Assignee: Stefan Richter


We should introduce a test that enforces a certain order in which life cycle 
methods of operators and udfs are called, so that they are not easily changed 
by accident.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4841) Suicide TaskManager if Task doesn't react to cancel signal

2016-10-17 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-4841:
--

 Summary: Suicide TaskManager if Task doesn't react to cancel signal
 Key: FLINK-4841
 URL: https://issues.apache.org/jira/browse/FLINK-4841
 Project: Flink
  Issue Type: Improvement
  Components: TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi


If the Task does not react to the signal, kill the TaskManager process via 
{{System.exit}}. That's better than leaking resources and will be recovered by 
the resource manager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

2016-10-17 Thread Aljoscha Krettek
Hi Vishnu,
what you suggested is spot on! Please go forward with it like this.

One small suggestion would be to change Tuple2 to something like
TimestampedValue to not rely on tuples because they can be confusing for
people who write Scala code because they are not Scala tuples. That's not
strictly necessary, though, you can spin it however you like.

Cheers,
Aljoscha

On Fri, 7 Oct 2016 at 18:46 Vishnu Viswanath 
wrote:

> Hi Radu,
>
> Yes we can remove elements randomly using iterator.remove()
>
> Regards,
> Vishnu
>
> On Fri, Oct 7, 2016 at 2:57 AM, Radu Tudoran 
> wrote:
>
> > Hi,
> >
> > I must apologies that I missed some of the email exchanges on this thread
> > and thus my remark/question might have been already settled.
> >
> > Does this interface you propose enable to remove also elements out of
> > order e.g., assuming I have elements 1,2,3,4,5 in the window buffer to be
> > able to evict 2 and 4?
> > We discussed about this some email exchanges ago but as I said I am not
> > sure if this functionality is captured in this interface. Basically, will
> > the typical remove() method from Iterators be available?
> >
> > Best regards,
> >
> >
> > -Original Message-
> > From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com]
> > Sent: Friday, October 07, 2016 8:29 AM
> > To: Dev
> > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
> >
> > Hi Aljoscha,
> >
> > To pass the time information to Evictor at the same to not expose the
> > StreamRecord, I suppose we can change the signature of evictBefore and
> > evictAfter to take Iterable> instead
> > Iterable>
> >
> > void evictBefore(Iterable> elements, int size, W window,
> > EvictorContext evictorContext);
> >
> > The fire() method of EvictingWindowOperator can transform the
> > Iterable> to FluentIterable> and pass
> it
> > on to the evictor(where f0 will be the timestamp and f1 will the value).
> > That way the TimeEvictor will work for EventTime or IngestionTime as long
> > as timestamp is set in the StreamRecord. In case timestamp is not set,
> > TimeEvictor can capture this by checking the Tuple2.f0 (which will be
> > Long.MIN_VALUE) and ignore the eviction.
> >
> > If you think this is fine, I will make the changes and also edit the
> FLIP.
> >
> > Regards,
> > Vishnu
> >
> >
> > On Wed, Oct 5, 2016 at 9:49 PM, Vishnu Viswanath <
> > vishnu.viswanat...@gmail.com> wrote:
> >
> > > Thank you Aljoscha,
> > >
> > > Yes, I agree we don't need ProcessingTimeEvcitor.
> > > I will change the current TimeEvictors to use EventTimeEvictor as
> > > suggested.
> > >
> > > Also, figure out a way to pass timestamp to Evictor interface so that
> we
> > > can avoid exposing StreamRecrods.
> > >
> > > Regards,
> > > Vishnu
> > >
> > >
> > >
> > > On Tue, Sep 20, 2016 at 4:33 AM, Aljoscha Krettek  >
> > > wrote:
> > >
> > >> Hi,
> > >> now you again see what I mentioned a while back: eviction based on
> > >> processing time is not really well defined. I think we can completely
> > get
> > >> rid of "processing time eviction" because it can be replaced by
> > something
> > >> like this:
> > >>
> > >> DataStream input = ...
> > >> DataStream withTimestamps = input.assignTimestampsAndWatermarks(new
> > >> IngestionTimeExtractor()) // this will assign the current processing
> > time
> > >> as timestamp
> > >> withTimestamps
> > >>   .keyBy(...)
> > >>   .window(...)
> > >>   .evictor(new EventTimeEvictor())
> > >>   .apply(...)
> > >>
> > >> With this, we would just have to find a good way of passing the
> > timestamps
> > >> in the Evictor interface and a good way of implementing the
> > >> EvictingWindowOperator.
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >>
> > >> On Sun, 18 Sep 2016 at 18:14 Vishnu Viswanath <
> > >> vishnu.viswanat...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Aljoscha,
> > >> >
> > >> > A)
> > >> > I tried the approach where we set the ProcessingTime explicitly by
> > >> > converting DataStream input  to DataStream> using
> > map
> > >> > function and below are my observations:
> > >> > 1. All the current code which uses TimeEvictor (which will be by
> > default
> > >> > changed to ProcessingTimeEvictor) will be forced to implement a
> > mapping
> > >> > Function to agree with the new method signature.
> > >> > 2. Even after doing the above mapping function, the timestamp field
> of
> > >> the
> > >> > StreamRecord will not be changed. Which might be confusing since now
> > we
> > >> > have two timestamps for the record, one set by the mapping function,
> > >> other
> > >> > in the StreamRecord.
> > >> > 3. Having a Stream of Tuple2 makes it confusing to do the
> > keyBy
> > >> > and also the now the WindowFunction has to process Tuple2
> > >> instead
> > >> > of T.
> > >> > 4. Users might get confused on how to set the ProcessingTime since
> > >> > ProcessingTime is the time at which the records are processed and
> > users
> > >> > might expect that to be a responsibility of Flink
> > >> >
> > >> > Ideally, Proc

[jira] [Created] (FLINK-4840) Introduce an OperatorSystemMetricGroup

2016-10-17 Thread zhuhaifeng (JIRA)
zhuhaifeng created FLINK-4840:
-

 Summary: Introduce an OperatorSystemMetricGroup
 Key: FLINK-4840
 URL: https://issues.apache.org/jira/browse/FLINK-4840
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Reporter: zhuhaifeng
Priority: Minor
 Fix For: 1.2.0


There will introduces the OperatorSystemMetricGroup, which encapsulates the 
insantiation of TPS meter, lantency/proc_time_cost histograms. 
Operator related System metrics are not instantiated directly by the specific 
Operator, but instead within the OperatorSystemMetricGroup contained in the 
respective OperatorMetricGroup. They are then later accessed by relevant 
components(maybe different places), instead of instantiated them identically 
with static name constants. Other system scope metrics (maybe 
delay/queue_in/queue_out) can add into the OperatorSystemMetricGroup some later.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Flink Metrics

2016-10-17 Thread Till Rohrmann
Hi Govind,

I think the DropwizardMeterWrapper implementation is just a reference
implementation where it was decided to report the minute rate. You can
define your own meter class which allows to configure the rate interval
accordingly.

Concerning Timers, I think nobody requested this metric so far. If you
want, then you can open a JIRA issue and contribute it. The community would
really appreciate that.

Cheers,
Till
​

On Mon, Oct 17, 2016 at 5:26 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi,
>
> I am currently using flink 1.2 snapshot and instrumenting my pipeline with
> flink metrics. One small suggestion I have is currently the Meter interface
> only supports getRate() which is always the one minute rate.
>
> It would great if all the rates (1 min, 5 min & 15 min) are exposed to get
> a better picture in terms of performance.
>
> Also is there any reason why timers are not part of flink metrics core?
>
> Regards,
> Govind
>