[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2016-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410727#comment-15410727
 ] 

ASF GitHub Bot commented on FLINK-1707:
---

Github user joseprupi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2053#discussion_r73792319
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java
 ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+
+/**
+ * This is an implementation of the Binary Affinity Propagation algorithm 
using a scatter-gather iteration.
+ * Note that is not the original Affinity Propagation.
+ *
+ * The input is an undirected graph where the vertices are the points to 
be clustered and the edge weights are the
+ * similarities of these points among them.
+ *
+ * The output is a Dataset of Tuple2, where f0 is the point id and f1 is 
the exemplar, so the clusters will be the
+ * the Tuples grouped by f1
+ *
+ * @see http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf;>
+ */
+
+@SuppressWarnings("serial")
+public class AffinityPropagation implements 
GraphAlgorithm>> {
+
+   private static Integer maxIterations;
+   private static float damping;
+   private static float epsilon;
+
+   /**
+* Creates a new AffinityPropagation instance algorithm instance.
+*
+* @param maxIterations The maximum number of iterations to run
+* @param damping Damping factor.
+* @param epsilon Epsilon factor. Do not send message to a neighbor if 
the new message
+* has not changed more than epsilon.
+*/
+   public AffinityPropagation(Integer maxIterations, float damping, float 
epsilon) {
+   this.maxIterations = maxIterations;
+   this.damping = damping;
+   this.epsilon = epsilon;
+   }
+
+   @Override
+   public DataSet> run(Graph 
input) throws Exception {
+
+   // Create E and I AP vertices
+   DataSet> verticesWithAllInNeighbors 
=
+   input.groupReduceOnEdges(new InitAPVertex(), 
EdgeDirection.IN);
+
+   List> APvertices = 
verticesWithAllInNeighbors.collect();
+
+   // Create E and I AP edges. Could this be done with some gelly 
functionality?
+   List> APedges = new ArrayList<>();
+
+   for(int i = 1; i < input.numberOfVertices() + 1; i++){
+   for(int j = 1; j < input.numberOfVertices() + 1; j++){
+   APedges.add(new Edge<>(i * 10L, j * 10L + 1, 

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

2016-08-06 Thread joseprupi
Github user joseprupi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2053#discussion_r73792319
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java
 ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+
+/**
+ * This is an implementation of the Binary Affinity Propagation algorithm 
using a scatter-gather iteration.
+ * Note that is not the original Affinity Propagation.
+ *
+ * The input is an undirected graph where the vertices are the points to 
be clustered and the edge weights are the
+ * similarities of these points among them.
+ *
+ * The output is a Dataset of Tuple2, where f0 is the point id and f1 is 
the exemplar, so the clusters will be the
+ * the Tuples grouped by f1
+ *
+ * @see http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf;>
+ */
+
+@SuppressWarnings("serial")
+public class AffinityPropagation implements 
GraphAlgorithm>> {
+
+   private static Integer maxIterations;
+   private static float damping;
+   private static float epsilon;
+
+   /**
+* Creates a new AffinityPropagation instance algorithm instance.
+*
+* @param maxIterations The maximum number of iterations to run
+* @param damping Damping factor.
+* @param epsilon Epsilon factor. Do not send message to a neighbor if 
the new message
+* has not changed more than epsilon.
+*/
+   public AffinityPropagation(Integer maxIterations, float damping, float 
epsilon) {
+   this.maxIterations = maxIterations;
+   this.damping = damping;
+   this.epsilon = epsilon;
+   }
+
+   @Override
+   public DataSet> run(Graph 
input) throws Exception {
+
+   // Create E and I AP vertices
+   DataSet> verticesWithAllInNeighbors 
=
+   input.groupReduceOnEdges(new InitAPVertex(), 
EdgeDirection.IN);
+
+   List> APvertices = 
verticesWithAllInNeighbors.collect();
+
+   // Create E and I AP edges. Could this be done with some gelly 
functionality?
+   List> APedges = new ArrayList<>();
+
+   for(int i = 1; i < input.numberOfVertices() + 1; i++){
+   for(int j = 1; j < input.numberOfVertices() + 1; j++){
+   APedges.add(new Edge<>(i * 10L, j * 10L + 1, 
NullValue.getInstance()));
+   }
+   }
+
+   DataSet> APEdgesDS = 
input.getContext().fromCollection(APedges);
+   DataSet> APVerticesDS = 

[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410707#comment-15410707
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
This PR has not been reviewed during the last 14 days.
Can anyone please review it?


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

2016-08-06 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
This PR has not been reviewed during the last 14 days.
Can anyone please review it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3414) Add Scala API for CEP's pattern definition

2016-08-06 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410706#comment-15410706
 ] 

Ivan Mushketyk commented on FLINK-3414:
---

Hi [~till.rohrmann]

Do you have any thoughts about this proposal?

> Add Scala API for CEP's pattern definition
> --
>
> Key: FLINK-3414
> URL: https://issues.apache.org/jira/browse/FLINK-3414
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the CEP library only supports a Java API to specify complex event 
> patterns. In order to make it a bit less verbose for Scala users, it would be 
> nice to also add a Scala API for the CEP library. 
> A Scala API would also allow to pass Scala's anonymous functions as filter 
> conditions or as a select function, for example, or to use partial functions 
> to distinguish between different events.
> Furthermore, the Scala API could be designed to feel a bit more like a DSL:
> {code}
> begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || 
> "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4029) Multi-field "sum" function just like "keyBy"

2016-08-06 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410705#comment-15410705
 ] 

Ivan Mushketyk commented on FLINK-4029:
---

Hi,

Can anyone give an advice about how this should be implemented?

> Multi-field "sum" function just like "keyBy"
> 
>
> Key: FLINK-4029
> URL: https://issues.apache.org/jira/browse/FLINK-4029
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Rami
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> I can use keyBy as follows:
> stream.keyBy(“pojo.field1”,”pojo.field2”,…)
> Would make sense that I can use sum for example, to do its job for more than 
> one field:
> stream.sum(“pojo.field1”,”pojo.field2”,…)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3703) Add sequence matching semantics to discard matched events

2016-08-06 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410704#comment-15410704
 ] 

Ivan Mushketyk commented on FLINK-3703:
---

Hi [~till.rohrmann],

Thank you for your reply.

Just to make sure that I understand your idea.
By default the pattern mentioned in the issue description will generate 
matching sequences: A, B; A, C; B, C;

Do I understand correctly that:
"after first" will generate sequences A, B; and B, C; but won't generate A, C; 
since event A can participate as the first element only once
"after last" will generate sequence A,  B; since every event can only 
participate in the matching once

Also, do I understand correctly that matching semantics should be defined on 
the sequence level and not on a level of a particular pattern?

> Add sequence matching semantics to discard matched events
> -
>
> Key: FLINK-3703
> URL: https://issues.apache.org/jira/browse/FLINK-3703
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> There is no easy way to decide whether events can be part of multiple 
> matching sequences or not. Currently, the default is that an event can 
> participate in multiple matching sequences. E.g. if you have the pattern 
> {{Pattern.begin("a").followedBy("b")}} and the input event stream 
> {{Event("A"), Event("B"), Event("C")}}, then you will generate the following 
> matching sequences: {{Event("A"), Event("B")}}, {{Event("A"), Event("C")}} 
> and {{Event("B"), Event("C")}}. 
> It would be useful to allow the user to define where the matching algorithm 
> should continue after a matching sequence has been found. Possible option 
> values could be 
>  * {{from first}} - continue keeping all events for future matches (that is 
> the current behaviour) 
>  * {{after first}} -  continue after the first element (remove first matching 
> event and continue with the second event)
>  * {{after last}} - continue after the last element (effectively discarding 
> all elements of the matching sequence)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction

2016-08-06 Thread Daniel Blazevski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410696#comment-15410696
 ] 

Daniel Blazevski commented on FLINK-3899:
-

[~aljoscha]

I see, would it be enough to expand on the example "WindowFunction with 
Incremental Aggregation" here:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html

with specific examples for MyFoldFunction, MyReduceFunction, MyWindowFunction?  

> Document window processing with Reduce/FoldFunction + WindowFunction
> 
>
> Key: FLINK-3899
> URL: https://issues.apache.org/jira/browse/FLINK-3899
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed 
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This 
> combination allows for eager window aggregation (only a single element is 
> kept in the window) and access of the Window object, e.g., to have access to 
> the window's start and end time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2016-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410690#comment-15410690
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2339
  
So far I've only implemented one or many and optional quantifiers. I would 
like to implement other quantifiers, but I would like to make sure that I am on 
the right track before I proceed any further.

I will update user documentation when all quantifiers are implemented.


> Add support for quantifiers to CEP's pattern API
> 
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers 
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds). 
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state. 
> In order to support the Kleene star operator, the {{NFACompiler}} has to be 
> extended to insert epsilon-transition between a Kleene start state and the 
> succeeding pattern state. In order to support {{?}}, one could insert two 
> paths from the preceding state, one which accepts the event and another which 
> directly goes into the next pattern state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2339: [FLINK-3318] Implement one or many and optional quantifie...

2016-08-06 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2339
  
So far I've only implemented one or many and optional quantifiers. I would 
like to implement other quantifiers, but I would like to make sure that I am on 
the right track before I proceed any further.

I will update user documentation when all quantifiers are implemented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2016-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410689#comment-15410689
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2339

[FLINK-3318] Implement one or many and optional quantifiers

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink cep-operators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2339.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2339


commit 17400369ec79ffd1d153973faa8259550a607a9d
Author: Ivan Mushketyk 
Date:   2016-08-05T20:05:39Z

[FLINK-3318] Implement one or many and optional quantifiers

commit cdd8196cf6b32c287ced3627c5207d4c479a85a8
Author: Ivan Mushketyk 
Date:   2016-08-06T17:38:14Z

[FLINK-3318] Add scala support for pattern quantifiers

commit 08fc02befd74bc861a86c5191c91ebb30e865f56
Author: Ivan Mushketyk 
Date:   2016-08-06T17:41:48Z

[FLINK-3318] Minor refactoring




> Add support for quantifiers to CEP's pattern API
> 
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers 
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds). 
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state. 
> In order to support the Kleene star operator, the {{NFACompiler}} has to be 
> extended to insert epsilon-transition between a Kleene start state and the 
> succeeding pattern state. In order to support {{?}}, one could insert two 
> paths from the preceding state, one which accepts the event and another which 
> directly goes into the next pattern state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2339: [FLINK-3318] Implement one or many and optional qu...

2016-08-06 Thread mushketyk
GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2339

[FLINK-3318] Implement one or many and optional quantifiers

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink cep-operators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2339.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2339


commit 17400369ec79ffd1d153973faa8259550a607a9d
Author: Ivan Mushketyk 
Date:   2016-08-05T20:05:39Z

[FLINK-3318] Implement one or many and optional quantifiers

commit cdd8196cf6b32c287ced3627c5207d4c479a85a8
Author: Ivan Mushketyk 
Date:   2016-08-06T17:38:14Z

[FLINK-3318] Add scala support for pattern quantifiers

commit 08fc02befd74bc861a86c5191c91ebb30e865f56
Author: Ivan Mushketyk 
Date:   2016-08-06T17:41:48Z

[FLINK-3318] Minor refactoring




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-06 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410688#comment-15410688
 ] 

Ivan Mushketyk commented on FLINK-3950:
---

In Dropwizard Metrics Meter is implemented as a separate metric type and does 
not seem to use Counter or Gauge instances.
Since you probably already put some thought into this, do you think there is 
any benefit in implementing a "Meter view"?

> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4322) Unify CheckpointCoordinator and SavepointCoordinator

2016-08-06 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410662#comment-15410662
 ] 

ramkrishna.s.vasudevan commented on FLINK-4322:
---

Ok, got it. I understand. Thanks. 

> Unify CheckpointCoordinator and SavepointCoordinator
> 
>
> Key: FLINK-4322
> URL: https://issues.apache.org/jira/browse/FLINK-4322
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
> Fix For: 1.2.0
>
>
> The Checkpoint coordinator should have the functionality of both handling 
> checkpoints and savepoints.
> The difference between checkpoints and savepoints is minimal:
>   - savepoints always write the root metadata of the checkpoint
>   - savepoints are always full (never incremental)
> The commonalities are large
>   - jobs should be able to resume from checkpoint or savepoints
>   - jobs should fall back to the latest checkpoint or savepoint
> This subsumes issue https://issues.apache.org/jira/browse/FLINK-3397



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-06 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788575
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value, long timestamp) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.value = value;
+   this.timestamp = timestamp;
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, long 
increment) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.increment = increment;
+   }
+
+   public byte[] getFamily() {
+   return family;
+   }
+
+   public byte[] getQualifier() {
+   return qualifier;
+   }
+
+   public byte[] getValue() {
+   return value;
+   }
+
+   public long getTs() {
+   return timestamp;
+   }
+
+   public Long getIncrement() {
+   return increment;
--- End diff --

The reason to say this was even Append is also a Mutation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410661#comment-15410661
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788575
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value, long timestamp) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.value = value;
+   this.timestamp = timestamp;
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, long 
increment) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.increment = increment;
+   }
+
+   public byte[] getFamily() {
+   return family;
+   }
+
+   public byte[] getQualifier() {
+   return qualifier;
+   }
+
+   public byte[] getValue() {
+   return value;
+   }
+
+   public long getTs() {
+   return timestamp;
+   }
+
+   public Long getIncrement() {
+   return increment;
--- End diff --

The reason to say this was even Append is also a Mutation. 


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Hilmi Yildirim
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410659#comment-15410659
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788502
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value, long timestamp) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.value = value;
+   this.timestamp = timestamp;
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, long 
increment) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.increment = increment;
+   }
+
+   public byte[] getFamily() {
+   return family;
+   }
+
+   public byte[] getQualifier() {
+   return qualifier;
+   }
+
+   public byte[] getValue() {
+   return value;
+   }
+
+   public long getTs() {
+   return timestamp;
+   }
+
+   public Long getIncrement() {
+   return increment;
--- End diff --

May be we can add an API explicity for increment?  Instead of having a 
boolean to identify increment?


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Hilmi Yildirim
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-06 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788502
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value, long timestamp) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.value = value;
+   this.timestamp = timestamp;
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, long 
increment) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.increment = increment;
+   }
+
+   public byte[] getFamily() {
+   return family;
+   }
+
+   public byte[] getQualifier() {
+   return qualifier;
+   }
+
+   public byte[] getValue() {
+   return value;
+   }
+
+   public long getTs() {
+   return timestamp;
+   }
+
+   public Long getIncrement() {
+   return increment;
--- End diff --

May be we can add an API explicity for increment?  Instead of having a 
boolean to identify increment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410657#comment-15410657
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788475
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
--- End diff --

Should this be -1? Ideally if there is no ts HBase will assign the server 
based timestamp.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Hilmi Yildirim
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-06 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788475
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
--- End diff --

Should this be -1? Ideally if there is no ts HBase will assign the server 
based timestamp.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---