[jira] [Commented] (FLINK-1746) Add linear discriminant analysis to machine learning library

2016-04-22 Thread Ronak Nathani (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15255146#comment-15255146
 ] 

Ronak Nathani commented on FLINK-1746:
--

Thanks [~till.rohrmann]! Look forward to working on this! :)

I am thinking to start with one of the three algorithms for distributed LDA 
mentioned in reference [1]. 

> Add linear discriminant analysis to machine learning library
> 
>
> Key: FLINK-1746
> URL: https://issues.apache.org/jira/browse/FLINK-1746
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Ronak Nathani
>  Labels: ML
>
> Linear discriminant analysis (LDA) [1] is used for dimensionality reduction 
> prior to classification. But it can also be used to calculate a linear 
> classifier on its own. Since dimensionality reduction is an important 
> preprocessing step, a distributed LDA implementation is a valuable addition 
> to flink-ml.
> Resources:
> [1] [http://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=5946724]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1743) Add multinomial logistic regression to machine learning library

2016-04-22 Thread David E Drummond (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254984#comment-15254984
 ] 

David E Drummond commented on FLINK-1743:
-

Thanks for the warm welcome [~trohrm...@apache.org]!  It looks like I can use 
the framework of the MultipleLinearRegression as a starting point, since it 
already implements the stochastic gradient descent.

> Add multinomial logistic regression to machine learning library
> ---
>
> Key: FLINK-1743
> URL: https://issues.apache.org/jira/browse/FLINK-1743
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: David E Drummond
>  Labels: ML
>
> Multinomial logistic regression [1] would be good first classification 
> algorithm which can classify multiple classes. 
> Resources:
> [1] [http://en.wikipedia.org/wiki/Multinomial_logistic_regression]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3753) KillerWatchDog should not use kill on toKill thread

2016-04-22 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3753:
--
Description: 
{code}
// this is harsh, but this watchdog is a last resort
if (toKill.isAlive()) {
  toKill.stop();
}
{code}
stop() is deprecated.

See:
https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads

  was:
{code}
// this is harsh, but this watchdog is a last resort
if (toKill.isAlive()) {
  toKill.stop();
}
{code}
stop() is deprecated.
See:
https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads


> KillerWatchDog should not use kill on toKill thread
> ---
>
> Key: FLINK-3753
> URL: https://issues.apache.org/jira/browse/FLINK-3753
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> // this is harsh, but this watchdog is a last resort
> if (toKill.isAlive()) {
>   toKill.stop();
> }
> {code}
> stop() is deprecated.
> See:
> https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254633#comment-15254633
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213581650
  
Regarding the Jackson / dependency issue: You don't need to worry about it 
know. I'll take another look at the problem and make sure it'll work once we 
merge it.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213581650
  
Regarding the Jackson / dependency issue: You don't need to worry about it 
know. I'll take another look at the problem and make sure it'll work once we 
merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60799474
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a shard's starting sequence number to a specified value
+*
+* @param streamShard the shard to perform the advance on
+* @param sequenceNum the sequence number to advance to
+*/
+   public void advanceSequenceNumberTo(KinesisStreamShard st

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254620#comment-15254620
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60799474
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a shard's starting

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254615#comment-15254615
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60799388
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard 
is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
--- End diff --

I like the idea of having the `Shard` as a field in the 
`KinesisStreamShard`. It will reduce the complexity and the number of lines of 
code of the connector.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60799388
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard 
is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
--- End diff --

I like the idea of having the `Shard` as a field in the 
`KinesisStreamShard`. It will reduce the complexity and the number of lines of 
code of the connector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254608#comment-15254608
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60799066
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer p

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60799066
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer properties
+   // 

+
+   /** The complete list of shards */
+   private final List shards;
+
+   /** Properties to parametrize settings such as AWS service regi

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60797751
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer properties
+   // 

+
+   /** The complete list of shards */
+   private final List shards;
+
+   /** Properties to parametrize settings such as AWS service regi

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254583#comment-15254583
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60797751
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer p

[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254576#comment-15254576
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user dawidwys commented on the pull request:

https://github.com/apache/flink/pull/1926#issuecomment-213575798
  
Always glad to help! In fact I would be happy for any suggestions what can 
I work on further.

I changed the tests. 


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-22 Thread dawidwys
Github user dawidwys commented on the pull request:

https://github.com/apache/flink/pull/1926#issuecomment-213575798
  
Always glad to help! In fact I would be happy for any suggestions what can 
I work on further.

I changed the tests. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3777] Managed closeInputFormat()

2016-04-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1903#issuecomment-213573556
  
Thanks for the quick update @fpompermaier! The changes look mostly good. 

What is missing are tests that verify that `DataSourceTask` and 
`GenericDataSourceBase` correctly call the new methods as part of their IF life 
cycle. Can you extend `GenericDataSourceBaseTest` and `DataSourceTask` and add 
tests to validate that the methods are called (maybe by using a mocked 
`RichInputFormat`)? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3777) Add open and close methods to manage IF lifecycle

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254572#comment-15254572
 ] 

ASF GitHub Bot commented on FLINK-3777:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1903#issuecomment-213573556
  
Thanks for the quick update @fpompermaier! The changes look mostly good. 

What is missing are tests that verify that `DataSourceTask` and 
`GenericDataSourceBase` correctly call the new methods as part of their IF life 
cycle. Can you extend `GenericDataSourceBaseTest` and `DataSourceTask` and add 
tests to validate that the methods are called (maybe by using a mocked 
`RichInputFormat`)? Thanks


> Add open and close methods to manage IF lifecycle
> -
>
> Key: FLINK-3777
> URL: https://issues.apache.org/jira/browse/FLINK-3777
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.1
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: inputformat, lifecycle
>
> At the moment the opening and closing of an inputFormat are not managed, 
> although open() could be (improperly IMHO) simulated by configure().
> This limits the possibility to reuse expensive resources (like database 
> connections) and manage their release. 
> Probably the best option would be to add 2 methods (i.e. openInputformat() 
> and closeInputFormat() ) to RichInputFormat*
> * NOTE: the best option from a "semantic" point of view would be to rename 
> the current open() and close() to openSplit() and closeSplit() respectively 
> while using open() and close() methods for the IF lifecycle management, but 
> this would cause a backward compatibility issue...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3777] Managed closeInputFormat()

2016-04-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1903#discussion_r60794865
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
 ---
@@ -133,4 +133,18 @@ public RuntimeContext getRuntimeContext(){
throw new RuntimeException("The underlying input format 
to this ReplicatingInputFormat isn't context aware");
}
}
+   
+   @Override
+   public void openInputFormat() {
+   if(this.replicatedIF instanceof RichInputFormat){
--- End diff --

Can you add a space before and after the parentheses of the condition, 
i.e., `if (condition) {`?
Please check the other changes as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3777) Add open and close methods to manage IF lifecycle

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254554#comment-15254554
 ] 

ASF GitHub Bot commented on FLINK-3777:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1903#discussion_r60794865
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
 ---
@@ -133,4 +133,18 @@ public RuntimeContext getRuntimeContext(){
throw new RuntimeException("The underlying input format 
to this ReplicatingInputFormat isn't context aware");
}
}
+   
+   @Override
+   public void openInputFormat() {
+   if(this.replicatedIF instanceof RichInputFormat){
--- End diff --

Can you add a space before and after the parentheses of the condition, 
i.e., `if (condition) {`?
Please check the other changes as well.


> Add open and close methods to manage IF lifecycle
> -
>
> Key: FLINK-3777
> URL: https://issues.apache.org/jira/browse/FLINK-3777
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.1
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: inputformat, lifecycle
>
> At the moment the opening and closing of an inputFormat are not managed, 
> although open() could be (improperly IMHO) simulated by configure().
> This limits the possibility to reuse expensive resources (like database 
> connections) and manage their release. 
> Probably the best option would be to add 2 methods (i.e. openInputformat() 
> and closeInputFormat() ) to RichInputFormat*
> * NOTE: the best option from a "semantic" point of view would be to rename 
> the current open() and close() to openSplit() and closeSplit() respectively 
> while using open() and close() methods for the IF lifecycle management, but 
> this would cause a backward compatibility issue...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3777) Add open and close methods to manage IF lifecycle

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254549#comment-15254549
 ] 

ASF GitHub Bot commented on FLINK-3777:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1903#discussion_r60794593
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
 ---
@@ -82,9 +82,6 @@ public void initializeOnMaster(ClassLoader loader) throws 
Exception {
catch (Throwable t) {
throw new Exception("Configuring the InputFormat (" + 
formatDescription + ") failed: " + t.getMessage(), t);
}
-   finally {
--- End diff --

Is this change intended?


> Add open and close methods to manage IF lifecycle
> -
>
> Key: FLINK-3777
> URL: https://issues.apache.org/jira/browse/FLINK-3777
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.1
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: inputformat, lifecycle
>
> At the moment the opening and closing of an inputFormat are not managed, 
> although open() could be (improperly IMHO) simulated by configure().
> This limits the possibility to reuse expensive resources (like database 
> connections) and manage their release. 
> Probably the best option would be to add 2 methods (i.e. openInputformat() 
> and closeInputFormat() ) to RichInputFormat*
> * NOTE: the best option from a "semantic" point of view would be to rename 
> the current open() and close() to openSplit() and closeSplit() respectively 
> while using open() and close() methods for the IF lifecycle management, but 
> this would cause a backward compatibility issue...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3777] Managed closeInputFormat()

2016-04-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1903#discussion_r60794593
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
 ---
@@ -82,9 +82,6 @@ public void initializeOnMaster(ClassLoader loader) throws 
Exception {
catch (Throwable t) {
throw new Exception("Configuring the InputFormat (" + 
formatDescription + ") failed: " + t.getMessage(), t);
}
-   finally {
--- End diff --

Is this change intended?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254544#comment-15254544
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-213567820
  
Thanks for the additional information @yjshen! I'm a bit behind with PR 
reviews, but will definitely have a look begin of next week. Thanks, Fabian


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-213567820
  
Thanks for the additional information @yjshen! I'm a bit behind with PR 
reviews, but will definitely have a look begin of next week. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254535#comment-15254535
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1926#issuecomment-213565509
  
Thanks for opening this PR @dawidwys!
I skimmed over the changes and it looks good. I will do a more detailed 
review hopefully soon.

Thanks also for adding a lot of tests. It is definitely good to have 
extensive test coverage, but end-to-end tests such as yours add quite a bit to 
the build time. Flink uses Travis as CI service which kills builds after 2h. 
Unfortunately, we are experiencing build time outs already and have to be 
careful when adding tests.

I would like to ask you to remove some of the tests which check for 
different expression syntax but end up in identical executions. In addition, it 
would be good to add one SQL test that executes a query which sorts on two 
fields to have the SQL part covered.

We will add a unit test framework, that checks for correct parsing of the 
expressions without actually executing queries in a separate effort. 


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1926#issuecomment-213565509
  
Thanks for opening this PR @dawidwys!
I skimmed over the changes and it looks good. I will do a more detailed 
review hopefully soon.

Thanks also for adding a lot of tests. It is definitely good to have 
extensive test coverage, but end-to-end tests such as yours add quite a bit to 
the build time. Flink uses Travis as CI service which kills builds after 2h. 
Unfortunately, we are experiencing build time outs already and have to be 
careful when adding tests.

I would like to ask you to remove some of the tests which check for 
different expression syntax but end up in identical executions. In addition, it 
would be good to add one SQL test that executes a query which sorts on two 
fields to have the SQL part covered.

We will add a unit test framework, that checks for correct parsing of the 
expressions without actually executing queries in a separate effort. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254352#comment-15254352
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/1926

[FLINK-2946] Add orderBy() to Table API

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink tableSort

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1926.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1926






> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-22 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/1926

[FLINK-2946] Add orderBy() to Table API

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink tableSort

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1926.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1926






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254332#comment-15254332
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213530765
  
Hi,

Cool, if you have time to adress them, go ahead :)
Thanks a lot for doing this by the way! I really like the work you did so 
far on the connector!

Sent from my iPhone

> On 22.04.2016, at 18:01, Tzu-Li Tai  wrote:
> 
> @rmetzger 
> Thank you very much for your detailed review on the PR :)
> I've replied to the comments you added, please .
> I can address the issues and follow up with corresponding commits within 
the next 36 hours. I am pretty much free for the next 3 days, and will very 
much like to get the consumer ready for merging by the end of this week :)
> 
> If it still isn't ready by the end of 4/25, I'm afraid I will have to 
leave any remaining issues for you to address since after then I temporarily 
won't be able to work on code until June.
> 
> —
> You are receiving this because you were mentioned.
> Reply to this email directly or view it on GitHub
> 



> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213530765
  
Hi,

Cool, if you have time to adress them, go ahead :)
Thanks a lot for doing this by the way! I really like the work you did so 
far on the connector!

Sent from my iPhone

> On 22.04.2016, at 18:01, Tzu-Li Tai  wrote:
> 
> @rmetzger 
> Thank you very much for your detailed review on the PR :)
> I've replied to the comments you added, please .
> I can address the issues and follow up with corresponding commits within 
the next 36 hours. I am pretty much free for the next 3 days, and will very 
much like to get the consumer ready for merging by the end of this week :)
> 
> If it still isn't ready by the end of 4/25, I'm afraid I will have to 
leave any remaining issues for you to address since after then I temporarily 
won't be able to work on code until June.
> 
> —
> You are receiving this because you were mentioned.
> Reply to this email directly or view it on GitHub
> 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3806) Revert use of DataSet.count() in Gelly

2016-04-22 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-3806:
-

 Summary: Revert use of DataSet.count() in Gelly
 Key: FLINK-3806
 URL: https://issues.apache.org/jira/browse/FLINK-3806
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 1.1.0
Reporter: Greg Hogan
Priority: Critical


FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The 
former returns a {{DataSet}} while the latter executes a job to return a Java 
value.

{{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and 
{{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and 
{{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the user 
does not pass the number of vertices as a parameter.

As noted in FLINK-1632, this does make the code simpler but if my understanding 
is correct will materialize the Graph twice. The Graph will need to be reread 
from input, regenerated, or recomputed by preceding algorithms.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2715) Benchmark Triangle Count methods

2016-04-22 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254230#comment-15254230
 ] 

Greg Hogan commented on FLINK-2715:
---

The performance of {{TriangleEnumerator}} was considerably worse until the 
recent fixes in FLINK-3770.  This algorithm could also be updated to initially 
order edges by lower degree rather than higher ID. It should also run faster 
with the upcoming hashing combiner. The use of {{TreeMap}} likely limits the 
performance relative to {{TriangleEnumerator}}.

Implementation of the Global Clustering Coefficient requires the triangle count 
and I've been working on what I think will be a nice way to capture algorithm 
metrics without duplicating code.

The Flink bug has been filed as FLINK-3805.

> Benchmark Triangle Count methods
> 
>
> Key: FLINK-2715
> URL: https://issues.apache.org/jira/browse/FLINK-2715
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Priority: Minor
>  Labels: starter
>
> Once FLINK-2714 is addressed, it would be nice to have a set of benchmarks 
> that test the efficiency of the DataSet, GSA and vertex-centric versions. 
> This means running the three examples on a cluster environment using various 
> graph DataSets. For instance, SNAP's Orkut and Friendster networks
> (https://snap.stanford.edu/data/).
> The results produced by the experiments should then be reported in the Gelly 
> docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60770825
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -0,0 +1,51 @@
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+/**
+ *
+ */
+public class KinesisConfigConstants {
+
+   // 

+   //  Configuration Keys
+   // 

+
+   /** The max retries to retrieve metadata from a Kinesis stream using 
describeStream API
+* (Note: describeStream attempts may be temporarily blocked due to AWS 
capping 5 attempts per sec)  */
+   public static final String CONFIG_STREAM_DESCRIBE_RETRIES = 
"flink.stream.describe.retry";
+
+   /** The backoff time between each describeStream attempt */
+   public static final String CONFIG_STREAM_DESCRIBE_BACKOFF = 
"flink.stream.describe.backoff";
+
+   /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
+   public static final String CONFIG_STREAM_INIT_POSITION_TYPE = 
"flink.stream.initpos.type";
+
+   /** The credential provider type to use when AWS credentials are 
required (BASIC is used if not set)*/
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = 
"aws.credentials.provider";
+
+   /** The AWS access key ID to use when setting credentials provider type 
to BASIC */
+   public static final String 
CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = 
"aws.credentials.provider.basic.accesskeyid";
+
+   /** The AWS secret key to use when setting credentials provider type to 
BASIC */
+   public static final String 
CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = 
"aws.credentials.provider.basic.secretkey";
+
+   /** Optional configuration for profile path if credential provider type 
is set to be PROFILE */
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH 
= "aws.credentials.provider.profile.path";
+
+   /** Optional configuration for profile name if credential provider type 
is set to be PROFILE */
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME 
= "aws.credentials.provider.profile.name";
+
+   /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is 
used if not set) */
+   public static final String CONFIG_AWS_REGION = "aws.region";
+
+
+   // 

+   //  Default configuration values
+   // 

+
+   public static final String DEFAULT_AWS_REGION = "us-east-1";
--- End diff --

I think its reasonable to make region a required argument. As a user, more 
than once I've found myself mistakened for the AWS SDK not correctly finding 
resources, only realizing that it is defaulting to another region unless 
specified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254221#comment-15254221
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60770825
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -0,0 +1,51 @@
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+/**
+ *
+ */
+public class KinesisConfigConstants {
+
+   // 

+   //  Configuration Keys
+   // 

+
+   /** The max retries to retrieve metadata from a Kinesis stream using 
describeStream API
+* (Note: describeStream attempts may be temporarily blocked due to AWS 
capping 5 attempts per sec)  */
+   public static final String CONFIG_STREAM_DESCRIBE_RETRIES = 
"flink.stream.describe.retry";
+
+   /** The backoff time between each describeStream attempt */
+   public static final String CONFIG_STREAM_DESCRIBE_BACKOFF = 
"flink.stream.describe.backoff";
+
+   /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
+   public static final String CONFIG_STREAM_INIT_POSITION_TYPE = 
"flink.stream.initpos.type";
+
+   /** The credential provider type to use when AWS credentials are 
required (BASIC is used if not set)*/
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = 
"aws.credentials.provider";
+
+   /** The AWS access key ID to use when setting credentials provider type 
to BASIC */
+   public static final String 
CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = 
"aws.credentials.provider.basic.accesskeyid";
+
+   /** The AWS secret key to use when setting credentials provider type to 
BASIC */
+   public static final String 
CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = 
"aws.credentials.provider.basic.secretkey";
+
+   /** Optional configuration for profile path if credential provider type 
is set to be PROFILE */
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH 
= "aws.credentials.provider.profile.path";
+
+   /** Optional configuration for profile name if credential provider type 
is set to be PROFILE */
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME 
= "aws.credentials.provider.profile.name";
+
+   /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is 
used if not set) */
+   public static final String CONFIG_AWS_REGION = "aws.region";
+
+
+   // 

+   //  Default configuration values
+   // 

+
+   public static final String DEFAULT_AWS_REGION = "us-east-1";
--- End diff --

I think its reasonable to make region a required argument. As a user, more 
than once I've found myself mistakened for the AWS SDK not correctly finding 
resources, only realizing that it is defaulting to another region unless 
specified.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v

[jira] [Comment Edited] (FLINK-1284) Uniform random sampling operator over windows

2016-04-22 Thread Austin Ouyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253383#comment-15253383
 ] 

Austin Ouyang edited comment on FLINK-1284 at 4/22/16 4:40 PM:
---

Hi [~senorcarbone],

Would we also want to add the ability to sample by percentage? Also what would 
the fieldID be referring to? I was thinking that there were 2 naive possible 
solutions. 
1) Once the trigger is made, we randomly sample for N samples or a percentage 
of all the samples in each window
2) Given a percentage of samples we want to retain from each window generate a 
random number between 0 and 1. Append to result if the random number is less 
than the specified percentage. 

I'd be happy to try working on this as well!


was (Author: aouyang1):
Hi Paris,

Would we also want to add the ability to sample by percentage? Also what would 
the fieldID be referring to? I was thinking that there were 2 naive possible 
solutions. 
1) Once the trigger is made, we randomly sample for N samples or a percentage 
of all the samples in each window
2) Given a percentage of samples we want to retain from each window generate a 
random number between 0 and 1. Append to result if the random number is less 
than the specified percentage. 


> Uniform random sampling operator over windows
> -
>
> Key: FLINK-1284
> URL: https://issues.apache.org/jira/browse/FLINK-1284
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Paris Carbone
>Priority: Minor
>
> It would be useful for several use cases to have a built-in uniform random 
> sampling operator in the streaming API that can operate on windows. This can 
> be used for example for online machine learning operations, evaluating 
> heuristics or continuous visualisation of representative values.
> The operator could be given a field and a number of random samples needed, 
> following a window statement as such:
> mystream.window(..).sample(fieldID,#samples)
> Given that pre-aggregation is enabled, this could perhaps be implemented as a 
> binary reduce operator or a combinable groupreduce that pre-aggregates the 
> empiricals of that field.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254132#comment-15254132
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213491263
  
@rmetzger 
Thank you very much for your detailed review on the PR :)
I've replied to the comments you added, please .
I can address the issues and follow up with corresponding commits within 
the next 36 hours. I am pretty much free for the next 3 days, and will very 
much like to get the consumer ready for merging by the end of this week :)

If it still isn't ready by the end of 4/25, I'm afraid I will have to leave 
any remaining issues for you to address since after then I temporarily won't be 
able to work on code until June.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213491263
  
@rmetzger 
Thank you very much for your detailed review on the PR :)
I've replied to the comments you added, please .
I can address the issues and follow up with corresponding commits within 
the next 36 hours. I am pretty much free for the next 3 days, and will very 
much like to get the consumer ready for merging by the end of this week :)

If it still isn't ready by the end of 4/25, I'm afraid I will have to leave 
any remaining issues for you to address since after then I temporarily won't be 
able to work on code until June.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3805) BloomFilter initialized with no memory available

2016-04-22 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-3805:
-

 Summary: BloomFilter initialized with no memory available
 Key: FLINK-3805
 URL: https://issues.apache.org/jira/browse/FLINK-3805
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 1.0.1, 1.1.0
Reporter: Greg Hogan
Priority: Critical


I flagged this as 1.1.0 and 1.0.1 without checking the latter.

Link to build, command, and stacktrace follow.

{{MutableHashTable.initTable}} is calling {{initBloomFilter}} when 
{{this.availableMemory.size()==0}}.

https://s3.amazonaws.com/apache-flink/flink_bloomfilter_crash.tar.bz2

./bin/flink run -class org.apache.flink.graph.examples.TriangleListing 
~/flink-gelly-examples_2.10-1.1-SNAPSHOT.jar --clip_and_flip false --output 
print --output print --scale 14 --count

{code}
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at 
org.apache.flink.graph.examples.TriangleListing.main(TriangleListing.java:106)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:805)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: expectedEntries should be > 0
at 
org.apache.flink.shaded.com.google.common.base.Preconditions.checkArgument(Preconditions.java:88)
at 
org.apache.flink.runtime.operators.util.BloomFilter.(BloomFilter.java:53)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.initBloomFilter(MutableHashTable.java:823)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.initTable(MutableHashTable.java:1183)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:887)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116)
at 
org.apache.flink.runtime

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254110#comment-15254110
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60760294
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a shard's starting

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60760294
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a shard's starting sequence number to a specified value
+*
+* @param streamShard the shard to perform the advance on
+* @param sequenceNum the sequence number to advance to
+*/
+   public void advanceSequenceNumberTo(KinesisStreamShard st

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254107#comment-15254107
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60760085
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer p

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60760085
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer properties
+   // 

+
+   /** The complete list of shards */
+   private final List shards;
+
+   /** Properties to parametrize settings such as AWS service regi

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254106#comment-15254106
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60759925
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard 
is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
--- End diff --

The main reason for why the consumer needs to have another Shard 
representation is because `com.amazonaws.services.kinesis.model.Shard` doesn't 
have the shard's associated stream name as a field. We will need the stream 
name when getting a shard iterator for a particular shard, using 
`com.amazonaws.services.kinesis.AmazonKinesisClient#getShardIterator(streamName,
 shardId, iteratorType)`. Moreover, since the consumer's implementation 
supports reading from multiple Kinesis streams, we must carry the associated 
stream name along with each Shard representation (I guess that's the reason why 
Amazon's Shard implementation doesn't have a field for stream name).

Our implementation, 
`org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard`, 
currently has `regionName` (I don't think we need this actually, since the 
consumer is limited to read from Kinesis streams within the same region) and 
`streamName` as fields besides the already supplied ones in Amazon's Shard. So, 
what we could do to reduce duplicate implementation is to include Amazon's 
Shard implementation as a field within our `KinesisStreamShard`, and let the 
`KinesisStreamShard` still have `streamName` as an extra field. How do you 
think?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60759925
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard 
is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
--- End diff --

The main reason for why the consumer needs to have another Shard 
representation is because `com.amazonaws.services.kinesis.model.Shard` doesn't 
have the shard's associated stream name as a field. We will need the stream 
name when getting a shard iterator for a particular shard, using 
`com.amazonaws.services.kinesis.AmazonKinesisClient#getShardIterator(streamName,
 shardId, iteratorType)`. Moreover, since the consumer's implementation 
supports reading from multiple Kinesis streams, we must carry the associated 
stream name along with each Shard representation (I guess that's the reason why 
Amazon's Shard implementation doesn't have a field for stream name).

Our implementation, 
`org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard`, 
currently has `regionName` (I don't think we need this actually, since the 
consumer is limited to read from Kinesis streams within the same region) and 
`streamName` as fields besides the already supplied ones in Amazon's Shard. So, 
what we could do to reduce duplicate implementation is to include Amazon's 
Shard implementation as a field within our `KinesisStreamShard`, and let the 
`KinesisStreamShard` still have `streamName` as an extra field. How do you 
think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254090#comment-15254090
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60756732
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer p

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60756732
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer properties
+   // 

+
+   /** The complete list of shards */
+   private final List shards;
+
+   /** Properties to parametrize settings such as AWS service regi

[jira] [Commented] (FLINK-2715) Benchmark Triangle Count methods

2016-04-22 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254084#comment-15254084
 ] 

Vasia Kalavri commented on FLINK-2715:
--

Hi [~greghogan],
Thanks a lot for reporting your results! What was the reason for 
{{GSATriangleCount}} crash?
{{GSATriangleCount}} was added to the library without proper evaluation. Since 
{{TriangleEnumerator}} performs well and provides the same functionality, I 
would be in favor of removing {{GSATriangleCount}} completely. I think we 
should keep the library small and try to provide as efficient implementations 
as possible. What do you think?

> Benchmark Triangle Count methods
> 
>
> Key: FLINK-2715
> URL: https://issues.apache.org/jira/browse/FLINK-2715
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Priority: Minor
>  Labels: starter
>
> Once FLINK-2714 is addressed, it would be nice to have a set of benchmarks 
> that test the efficiency of the DataSet, GSA and vertex-centric versions. 
> This means running the three examples on a cluster environment using various 
> graph DataSets. For instance, SNAP's Orkut and Friendster networks
> (https://snap.stanford.edu/data/).
> The results produced by the experiments should then be reported in the Gelly 
> docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60755162
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer properties
+   // 

+
+   /** The complete list of shards */
+   private final List shards;
+
+   /** Properties to parametrize settings such as AWS service regi

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254080#comment-15254080
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60755162
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer p

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60753879
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
+
+   this.kinesisClient = client;
+   }
+
+   /**
+* Get the next batch of data records using a specific shard iterator
+*
+* @param shardIterator a shard iterator that encodes info about which 
shard to read and where to start reading
+* @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
+* @return the batch of retrieved records
+*/
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
+   getRecordsRequest.setShardIterator(shardIterator);
+   getRecordsRequest.setLimit(maxRecordsToGet);
+
+   GetRecordsResult getRecordsResult = null;
+
+   int remainingRetryTimes = Integer.valueOf(
+   
configProps.getProperty(KinesisConfigConstants.CONFIG_ST

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254068#comment-15254068
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60753879
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
+
+   this.kinesisClient = client;
+   }
+
+   /**
+* Get the next batch of data records using a specific shard iterator
+*
+* @param shardIterator a shard iterator that encodes info about which 
shard to read and where to start reading
+* @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
+* @return the batch of retrieved records
+*/
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
+   getRecordsRequest.setShardIterator(shardIterator);
+  

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254063#comment-15254063
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213465308
  
@rmetzger Regarding the need to upgrade Jackson to get the example working:
After updating the code for Flink 1.0.x to prepare for PR, I only tested 
the consumer with manual tests within the Flink project code.
However, when the consumer was first implemented when Flink was 0.10.1, at 
the time I did package the consumer and used it as a separate dependency.
I'm wondering would it have anything to do with changes between older & 
newer Flink version?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213465308
  
@rmetzger Regarding the need to upgrade Jackson to get the example working:
After updating the code for Flink 1.0.x to prepare for PR, I only tested 
the consumer with manual tests within the Flink project code.
However, when the consumer was first implemented when Flink was 0.10.1, at 
the time I did package the consumer and used it as a separate dependency.
I'm wondering would it have anything to do with changes between older & 
newer Flink version?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254055#comment-15254055
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60752259
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a shard's starting

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60752259
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a shard's starting sequence number to a specified value
+*
+* @param streamShard the shard to perform the advance on
+* @param sequenceNum the sequence number to advance to
+*/
+   public void advanceSequenceNumberTo(KinesisStreamShard st

[jira] [Commented] (FLINK-2715) Benchmark Triangle Count methods

2016-04-22 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254051#comment-15254051
 ] 

Greg Hogan commented on FLINK-2715:
---

I happened to include some triangle benchmarks in the pull request for 
FLINK-3768. Adding a flag for {{GSATriangleCount}} was simple. Testing on an 
AWS ec2.8xlarge (36 vcores, 60 GiB), what took {{TriangleListing}} 5s and 
{{TriangleEnumerator}} 7s crashed Flink at 6m3s with {{GSATriangleCount}}. 
Orkut and Friendster are 100x and 2000x larger than RMat s16e16 so running 
these benchmarks with GSA or SG does not look feasible with the current 
algorithms.

I do agree that for overlapping algorithms in Gelly it would be nice to present 
users a performance comparison. Thoughts, [~vkalavri]?

> Benchmark Triangle Count methods
> 
>
> Key: FLINK-2715
> URL: https://issues.apache.org/jira/browse/FLINK-2715
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Priority: Minor
>  Labels: starter
>
> Once FLINK-2714 is addressed, it would be nice to have a set of benchmarks 
> that test the efficiency of the DataSet, GSA and vertex-centric versions. 
> This means running the three examples on a cluster environment using various 
> graph DataSets. For instance, SNAP's Orkut and Friendster networks
> (https://snap.stanford.edu/data/).
> The results produced by the experiments should then be reported in the Gelly 
> docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254004#comment-15254004
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user StefanRRichter commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-213443445
  
The current version should cover all of your feedback and could be pulled 
@tillrohrmann .


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-22 Thread StefanRRichter
Github user StefanRRichter commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-213443445
  
The current version should cover all of your feedback and could be pulled 
@tillrohrmann .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1898#issuecomment-213434810
  
Thanks for your contribution @rawkintrevo. Good work. I had some minor 
inline comments. I'm mainly concerned about the efficiency of 
`multiRandomSplit` because it can construct some really long pipelines.

I think we should also add online documentation for the `Splitter`. 
Otherwise people will just miss it. You can take a look at `docs/libs/ml/` and 
create a web page for the splitter. We could then create a site with tools from 
where we link to the `Splitter`, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253954#comment-15253954
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1898#issuecomment-213434810
  
Thanks for your contribution @rawkintrevo. Good work. I had some minor 
inline comments. I'm mainly concerned about the efficiency of 
`multiRandomSplit` because it can construct some really long pipelines.

I think we should also add online documentation for the `Splitter`. 
Otherwise people will just miss it. You can take a look at `docs/libs/ml/` and 
create a web page for the splitter. We could then create a site with tools from 
where we link to the `Splitter`, for example.


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is used to 
> evaluate the performance of the model, and assess its generalization [1]
> This is a common approach when training Artificial Neural Networks, and a 
> good strategy to choose in data-rich environments. Therefore we should have 
> some support of this data-analysis process in our Estimators.
> [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of 
> statistical learning. Vol. 1. Springer, Berlin: Springer series in 
> statistics, 2001.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739883
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
+
+
+   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() 
should equal(0)
+  }
+
+  it should "result in datasets of an expected size when precise" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
+
+val expectedLength = dataSet.count().toDouble * 0.5
+
+splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0)
+  }
+
+  it should "result in expected number of datasets" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val fracArray = Array(0.5, 0.25, 0.25)
+
+val splitDataSets = Splitter.multiRandomSplit(dataSet, fracArray)
+
+splitDataSets.length should equal(fracArray.length)
+  }
+
--- End diff --

Maybe we could add a test case for sampling with replacement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253944#comment-15253944
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739883
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
+
+
+   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() 
should equal(0)
+  }
+
+  it should "result in datasets of an expected size when precise" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
+
+val expectedLength = dataSet.count().toDouble * 0.5
+
+splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0)
+  }
+
+  it should "result in expected number of datasets" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val fracArray = Array(0.5, 0.25, 0.25)
+
+val splitDataSets = Splitter.multiRandomSplit(dataSet, fracArray)
+
+splitDataSets.length should equal(fracArray.length)
+  }
+
--- End diff --

Maybe we could add a test case for sampling with replacement?


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is used to 
> evaluate the performance of the model, and assess its generalization [1]
> This is a common approach when training Artificial Neural Networks, and a 
> good strategy to choose in data-rich environments. Therefore we should have 
> some support of this data-analysis process in our Estimators.
> [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of 
> statistical learning. Vol. 1. Springer, Berlin: Springer series in 
> statistics, 2001.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739818
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
--- End diff --

What happens if fraction is larger than `1` and `withReplacement` is set to 
`false`? Shouldn't it be set to `true` in this case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253943#comment-15253943
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739818
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
--- End diff --

What happens if fraction is larger than `1` and `withReplacement` is set to 
`false`? Shouldn't it be set to `true` in this case?


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, 

[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253937#comment-15253937
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739509
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
+
+
+   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() 
should equal(0)
+  }
+
+  it should "result in datasets of an expected size when precise" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
--- End diff --

Why don't we use the precise sampling here?


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is used to 
> evaluate the performance of the model, and assess its generalization [1]
> This is a common approach when training Artificial Neural Networks, and a 
> good strategy to choose in data-rich environments. Therefore we should have 
> some support of this data-analysis process in our Estimators.
> [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of 
> statistical learning. Vol. 1. Springer, Berlin: Springer series in 
> statistics, 2001.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739509
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
+
+
+   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() 
should equal(0)
+  }
+
+  it should "result in datasets of an expected size when precise" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
--- End diff --

Why don't we use the precise sampling here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253936#comment-15253936
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739425
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
+
+
+   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() 
should equal(0)
+  }
+
+  it should "result in datasets of an expected size when precise" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
+
+val expectedLength = dataSet.count().toDouble * 0.5
--- End diff --

The size of `dataSet` we can calculate without executing a job. Simply 
`data.size`


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is used to 
> evaluate the performance of the model, and assess its generalization [1]
> This is a common approach when training Artificial Neural Networks, and a 
> good strategy to choose in data-rich environments. Therefore we should have 
> some support of this data-analysis process in our Estimators.
> [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of 
> statistical learning. Vol. 1. Springer, Berlin: Springer series in 
> statistics, 2001.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739425
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
+
+
+   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() 
should equal(0)
+  }
+
+  it should "result in datasets of an expected size when precise" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
+
+val expectedLength = dataSet.count().toDouble * 0.5
--- End diff --

The size of `dataSet` we can calculate without executing a job. Simply 
`data.size`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253935#comment-15253935
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739344
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
+
+
+   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() 
should equal(0)
+  }
+
+  it should "result in datasets of an expected size when precise" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
+
+val expectedLength = dataSet.count().toDouble * 0.5
+
+splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0)
--- End diff --

Does this mean that the test case could fail? Even if its unlikely?


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is used to 
> evaluate the performance of the model, and assess its generalization [1]
> This is a common approach when training Artificial Neural Networks, and a 
> good strategy to choose in data-rich environments. Therefore we should have 
> some support of this data-analysis process in our Estimators.
> [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of 
> statistical learning. Vol. 1. Springer, Berlin: Springer series in 
> statistics, 2001.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739344
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
+
+
+   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() 
should equal(0)
+  }
+
+  it should "result in datasets of an expected size when precise" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
+
+val expectedLength = dataSet.count().toDouble * 0.5
+
+splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0)
--- End diff --

Does this mean that the test case could fail? Even if its unlikely?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253932#comment-15253932
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739128
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
--- End diff --

Maybe we could `union` `splitDataSets(0)` and `splitDataSet(1)` and then 
join them with `dataSet` and then count the whole thing. Then we could avoid 
two executions because every `count` will trigger the execution of the whole 
pipeline.


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is used to 
> evaluate the performance of the model, and assess its generalization [1]
> This is a common approach when training Artificial Neural Networks, and a 
> good strategy to choose in data-rich environments. Therefore we should have 
> some support of this data-analysis process in our Estimators.
> [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of 
> statistical learning. Vol. 1. Springer, Berlin: Springer series in 
> statistics, 2001.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60739128
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
+
+   (splitDataSets(0).count() + splitDataSets(1).count()) should 
equal(dataSet.count())
--- End diff --

Maybe we could `union` `splitDataSets(0)` and `splitDataSet(1)` and then 
join them with `dataSet` and then count the whole thing. Then we could avoid 
two executions because every `count` will trigger the execution of the whole 
pipeline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60738935
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
--- End diff --

`zipWithUniqueId` should be fine here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253927#comment-15253927
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60738935
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.api.scala.utils._
+
+
+class SplitterITSuite extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's DataSet Splitter"
+
+  import MinMaxScalerData._
+
+ it should "result in datasets with no elements in common and all elements 
used" in {
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+
+val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
--- End diff --

`zipWithUniqueId` should be fine here


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is used to 
> evaluate the performance of the model, and assess its generalization [1]
> This is a common approach when training Artificial Neural Networks, and a 
> good strategy to choose in data-rich environments. Therefore we should have 
> some support of this data-analysis process in our Estimators.
> [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of 
> statistical learning. Vol. 1. Springer, Berlin: Springer series in 
> statistics, 2001.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253923#comment-15253923
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60738627
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60738627
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element of a 
vector.
+   *
+   * @param input   DataSet to be split
+   * @param fracArray   An array of PROPORTIONS for splitting the 
DataSet. Unlike th

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60738393
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element of a 
vector.
+   *
+   * @param input   DataSet to be split
+   * @param fracArray   An array of PROPORTIONS for splitting the 
DataSet. Unlike th

[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253920#comment-15253920
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60738393
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60738251
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element of a 
vector.
+   *
+   * @param input   DataSet to be split
+   * @param fracArray   An array of PROPORTIONS for splitting the 
DataSet. Unlike th

[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253917#comment-15253917
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60738251
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 
---

[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253913#comment-15253913
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60737984
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60737947
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element of a 
vector.
+   *
+   * @param input   DataSet to be split
+   * @param fracArray   An array of PROPORTIONS for splitting the 
DataSet. Unlike th

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60737984
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element of a 
vector.
+   *
+   * @param input   DataSet to be split
+   * @param fracArray   An array of PROPORTIONS for splitting the 
DataSet. Unlike th

[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253911#comment-15253911
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60737947
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60736953
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element of a 
vector.
+   *
+   * @param input   DataSet to be split
+   * @param fracArray   An array of PROPORTIONS for splitting the 
DataSet. Unlike th

[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253903#comment-15253903
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60736953
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 
---

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253899#comment-15253899
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213425285
  
Overall, I really like the work you did for the Kinesis Consumer! As you 
can see I've added some comments on the PR.
Please let me know what's you opinion on my comments.

When you do you think you'll have time to address the issues? I would like 
to get the code merged as soon as possible because Amazon is asking for having 
it in Flink soon. If you know already that you won't have time for working on 
this in the upcoming days, I can also address the comments. Just let me know, I 
think we can find a solution.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213425285
  
Overall, I really like the work you did for the Kinesis Consumer! As you 
can see I've added some comments on the PR.
Please let me know what's you opinion on my comments.

When you do you think you'll have time to address the issues? I would like 
to get the code merged as soon as possible because Amazon is asking for having 
it in Flink soon. If you know already that you won't have time for working on 
this in the upcoming days, I can also address the comments. Just let me know, I 
think we can find a solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3618) Rename abstract UDF classes in Scatter-Gather implementation

2016-04-22 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253888#comment-15253888
 ] 

Greg Hogan commented on FLINK-3618:
---

This looks to be a simple change with no dissension, but is unassigned. Any 
reason not to make this change for 1.1.0?

> Rename abstract UDF classes in Scatter-Gather implementation
> 
>
> Key: FLINK-3618
> URL: https://issues.apache.org/jira/browse/FLINK-3618
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Martin Junghanns
>Priority: Minor
>
> We now offer three Vertex-centric computing abstractions:
> * Pregel
> * Gather-Sum-Apply
> * Scatter-Gather
> Each of these abstractions provides abstract classes that need to be 
> implemented by the user:
> * Pregel: {{ComputeFunction}}
> * GSA: {{GatherFunction}}, {{SumFunction}}, {{ApplyFunction}}
> * Scatter-Gather: {{MessagingFunction}}, {{VertexUpdateFunction}}
> In Pregel and GSA, the names of those functions follow the name of the 
> abstraction or the name suggested in the corresponding papers. For 
> consistency of the API, I propose to rename {{MessageFunction}} to 
> {{ScatterFunction}} and {{VertexUpdateFunction}} to {{GatherFunction}}.
> Also for consistency, I would like to change the parameter order in 
> {{Graph.runScatterGatherIteration(VertexUpdateFunction f1, MessagingFunction 
> f2}} to  {{Graph.runScatterGatherIteration(ScatterFunction f1, GatherFunction 
> f2}} (like in {{Graph.runGatherSumApplyFunction(...)}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253889#comment-15253889
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60735615
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60735615
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
+Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
+  }
+
+  // 

+  //  multiRandomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element of a 
vector.
+   *
+   * @param input   DataSet to be split
+   * @param fracArray   An array of PROPORTIONS for splitting the 
DataSet. Unlike th

[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253886#comment-15253886
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60735451
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
--- End diff --

We could write this a bit more efficiently:

```
val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
  .where(0)
  .equalTo(0).apply {
(full: (Long,T) , left: (Long, T), collector: Collector[T]) =>
  if (left 

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60735451
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
+
+val leftSplit: DataSet[(Long, T)] = precise match {
+  case false => indexedInput.sample(false, fraction, seed)
+  case true => {
+val count = indexedInput.count()
+val numOfSamples = math.round(fraction * count).toInt
+indexedInput.sampleWithSize(false, numOfSamples, seed)
+  }
+}
+
+val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
+  .where(0)
+  .equalTo(0) {
+(full: (Long,T) , left: (Long, T)) =>  (if (left == null) full 
else null)
+  }
+  .filter( o => o != null )
--- End diff --

We could write this a bit more efficiently:

```
val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
  .where(0)
  .equalTo(0).apply {
(full: (Long,T) , left: (Long, T), collector: Collector[T]) =>
  if (left == null) {
collector.collect(full._2)
  }
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled

[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253874#comment-15253874
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-213419043
  
The type annotation work is done from bottom to top:

Firstly, we know each schema of the two input, and we know `List[] 
expression` in `Project` are used to manipulate one row of table data as input 
and output one value per expression, therefore, we can infer the the output 
schema of `Project` (in the current impl this was expressed as: `def output: 
Seq[Attribute]`) if we know each expressions `dataType`. 

For example, `Add`'s dataType is same as it's input, `Or`'s dataType is 
always `Boolean`, `pow(a, b)`'s dataType is always `Double`, however, if and 
only if we understand all kinds of expressions, we are able to infer its 
`dataType`. The main problems here is we only have `Call`(Unresolved Function) 
generated during expression construction, therefore, we should resolve them 
first into solid `Expression`s. `FunctionCatalog` is introduced here for a 
mapping from `FunctionName -> Expression`, we can easily finish the translation 
work as we look up `catalog`.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-22 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-213419043
  
The type annotation work is done from bottom to top:

Firstly, we know each schema of the two input, and we know `List[] 
expression` in `Project` are used to manipulate one row of table data as input 
and output one value per expression, therefore, we can infer the the output 
schema of `Project` (in the current impl this was expressed as: `def output: 
Seq[Attribute]`) if we know each expressions `dataType`. 

For example, `Add`'s dataType is same as it's input, `Or`'s dataType is 
always `Boolean`, `pow(a, b)`'s dataType is always `Double`, however, if and 
only if we understand all kinds of expressions, we are able to infer its 
`dataType`. The main problems here is we only have `Call`(Unresolved Function) 
generated during expression construction, therefore, we should resolve them 
first into solid `Expression`s. `FunctionCatalog` is introduced here for a 
mapping from `FunctionName -> Expression`, we can easily finish the translation 
work as we look up `catalog`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253872#comment-15253872
 ] 

ASF GitHub Bot commented on FLINK-2259:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60733883
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
--- End diff --

it should be sufficient to use `zipWithUniqueId` here. It is more efficient 
than `zipWithIndex`


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -
>
> Key: FLINK-2259
> URL: https://issues.apache.org/jira/browse/FLINK-2259
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is u

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

2016-04-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1898#discussion_r60733883
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.scala. DataSet
+import org.apache.flink.api.scala.utils._
+
+import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
+import _root_.scala.reflect.ClassTag
+
+object Splitter {
+
+  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
+ testing: 
DataSet[T])
+
+  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
+
testing: DataSet[T],
+
holdout: DataSet[T])
+  // 

+  //  randomSplit
+  // 

+  /**
+   * Split a DataSet by the probability fraction of each element.
+   *
+   * @param input   DataSet to be split
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average. This fraction refers to the 
first element in the
+   *resulting array.
+   * @param precise Sampling by default is random and can result 
in slightly lop-sided
+   *sample sets. When precise is true, equal 
sample set size are forced,
+   *however this is somewhat less efficient.
+   * @param seedRandom number generator seed.
+   * @return An array of two datasets
+   */
+
+  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
+  fraction: Double,
+  precise: Boolean = false,
+  seed: Long = 
Utils.RNG.nextLong())
+  : Array[DataSet[T]] = {
+import org.apache.flink.api.scala._
+
+val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
--- End diff --

it should be sufficient to use `zipWithUniqueId` here. It is more efficient 
than `zipWithIndex`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253871#comment-15253871
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60733759
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer p

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60733759
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer properties
+   // 

+
+   /** The complete list of shards */
+   private final List shards;
+
+   /** Properties to parametrize settings such as AWS service regi

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253870#comment-15253870
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60733595
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
--- End diff --

I don't think we need to implement the `CheckpointListener` currently 
because we are not committing the offsets anywhere.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> -

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-22 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60733595
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
--- End diff --

I don't think we need to implement the `CheckpointListener` currently 
because we are not committing the offsets anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >