[jira] [Commented] (FLINK-1746) Add linear discriminant analysis to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15255146#comment-15255146 ] Ronak Nathani commented on FLINK-1746: -- Thanks [~till.rohrmann]! Look forward to working on this! :) I am thinking to start with one of the three algorithms for distributed LDA mentioned in reference [1]. > Add linear discriminant analysis to machine learning library > > > Key: FLINK-1746 > URL: https://issues.apache.org/jira/browse/FLINK-1746 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Ronak Nathani > Labels: ML > > Linear discriminant analysis (LDA) [1] is used for dimensionality reduction > prior to classification. But it can also be used to calculate a linear > classifier on its own. Since dimensionality reduction is an important > preprocessing step, a distributed LDA implementation is a valuable addition > to flink-ml. > Resources: > [1] [http://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=5946724] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1743) Add multinomial logistic regression to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254984#comment-15254984 ] David E Drummond commented on FLINK-1743: - Thanks for the warm welcome [~trohrm...@apache.org]! It looks like I can use the framework of the MultipleLinearRegression as a starting point, since it already implements the stochastic gradient descent. > Add multinomial logistic regression to machine learning library > --- > > Key: FLINK-1743 > URL: https://issues.apache.org/jira/browse/FLINK-1743 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: David E Drummond > Labels: ML > > Multinomial logistic regression [1] would be good first classification > algorithm which can classify multiple classes. > Resources: > [1] [http://en.wikipedia.org/wiki/Multinomial_logistic_regression] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3753) KillerWatchDog should not use kill on toKill thread
[ https://issues.apache.org/jira/browse/FLINK-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3753: -- Description: {code} // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); } {code} stop() is deprecated. See: https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads was: {code} // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); } {code} stop() is deprecated. See: https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads > KillerWatchDog should not use kill on toKill thread > --- > > Key: FLINK-3753 > URL: https://issues.apache.org/jira/browse/FLINK-3753 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > // this is harsh, but this watchdog is a last resort > if (toKill.isAlive()) { > toKill.stop(); > } > {code} > stop() is deprecated. > See: > https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254633#comment-15254633 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213581650 Regarding the Jackson / dependency issue: You don't need to worry about it know. I'll take another look at the problem and make sure it'll work once we merge it. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213581650 Regarding the Jackson / dependency issue: You don't need to worry about it know. I'll take another look at the problem and make sure it'll work once we merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60799474 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMap assignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a shard's starting sequence number to a specified value +* +* @param streamShard the shard to perform the advance on +* @param sequenceNum the sequence number to advance to +*/ + public void advanceSequenceNumberTo(KinesisStreamShard st
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254620#comment-15254620 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60799474 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMap assignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a shard's starting
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254615#comment-15254615 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60799388 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information + * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to + * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. + */ +public class KinesisStreamShard implements Serializable { --- End diff -- I like the idea of having the `Shard` as a field in the `KinesisStreamShard`. It will reduce the complexity and the number of lines of code of the connector. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60799388 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information + * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to + * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. + */ +public class KinesisStreamShard implements Serializable { --- End diff -- I like the idea of having the `Shard` as a field in the `KinesisStreamShard`. It will reduce the complexity and the number of lines of code of the connector. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254608#comment-15254608 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60799066 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer p
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60799066 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer properties + // + + /** The complete list of shards */ + private final List shards; + + /** Properties to parametrize settings such as AWS service regi
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60797751 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer properties + // + + /** The complete list of shards */ + private final List shards; + + /** Properties to parametrize settings such as AWS service regi
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254583#comment-15254583 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60797751 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer p
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254576#comment-15254576 ] ASF GitHub Bot commented on FLINK-2946: --- Github user dawidwys commented on the pull request: https://github.com/apache/flink/pull/1926#issuecomment-213575798 Always glad to help! In fact I would be happy for any suggestions what can I work on further. I changed the tests. > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user dawidwys commented on the pull request: https://github.com/apache/flink/pull/1926#issuecomment-213575798 Always glad to help! In fact I would be happy for any suggestions what can I work on further. I changed the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3777] Managed closeInputFormat()
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1903#issuecomment-213573556 Thanks for the quick update @fpompermaier! The changes look mostly good. What is missing are tests that verify that `DataSourceTask` and `GenericDataSourceBase` correctly call the new methods as part of their IF life cycle. Can you extend `GenericDataSourceBaseTest` and `DataSourceTask` and add tests to validate that the methods are called (maybe by using a mocked `RichInputFormat`)? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3777) Add open and close methods to manage IF lifecycle
[ https://issues.apache.org/jira/browse/FLINK-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254572#comment-15254572 ] ASF GitHub Bot commented on FLINK-3777: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1903#issuecomment-213573556 Thanks for the quick update @fpompermaier! The changes look mostly good. What is missing are tests that verify that `DataSourceTask` and `GenericDataSourceBase` correctly call the new methods as part of their IF life cycle. Can you extend `GenericDataSourceBaseTest` and `DataSourceTask` and add tests to validate that the methods are called (maybe by using a mocked `RichInputFormat`)? Thanks > Add open and close methods to manage IF lifecycle > - > > Key: FLINK-3777 > URL: https://issues.apache.org/jira/browse/FLINK-3777 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.1 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: inputformat, lifecycle > > At the moment the opening and closing of an inputFormat are not managed, > although open() could be (improperly IMHO) simulated by configure(). > This limits the possibility to reuse expensive resources (like database > connections) and manage their release. > Probably the best option would be to add 2 methods (i.e. openInputformat() > and closeInputFormat() ) to RichInputFormat* > * NOTE: the best option from a "semantic" point of view would be to rename > the current open() and close() to openSplit() and closeSplit() respectively > while using open() and close() methods for the IF lifecycle management, but > this would cause a backward compatibility issue... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3777] Managed closeInputFormat()
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1903#discussion_r60794865 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java --- @@ -133,4 +133,18 @@ public RuntimeContext getRuntimeContext(){ throw new RuntimeException("The underlying input format to this ReplicatingInputFormat isn't context aware"); } } + + @Override + public void openInputFormat() { + if(this.replicatedIF instanceof RichInputFormat){ --- End diff -- Can you add a space before and after the parentheses of the condition, i.e., `if (condition) {`? Please check the other changes as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3777) Add open and close methods to manage IF lifecycle
[ https://issues.apache.org/jira/browse/FLINK-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254554#comment-15254554 ] ASF GitHub Bot commented on FLINK-3777: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1903#discussion_r60794865 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java --- @@ -133,4 +133,18 @@ public RuntimeContext getRuntimeContext(){ throw new RuntimeException("The underlying input format to this ReplicatingInputFormat isn't context aware"); } } + + @Override + public void openInputFormat() { + if(this.replicatedIF instanceof RichInputFormat){ --- End diff -- Can you add a space before and after the parentheses of the condition, i.e., `if (condition) {`? Please check the other changes as well. > Add open and close methods to manage IF lifecycle > - > > Key: FLINK-3777 > URL: https://issues.apache.org/jira/browse/FLINK-3777 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.1 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: inputformat, lifecycle > > At the moment the opening and closing of an inputFormat are not managed, > although open() could be (improperly IMHO) simulated by configure(). > This limits the possibility to reuse expensive resources (like database > connections) and manage their release. > Probably the best option would be to add 2 methods (i.e. openInputformat() > and closeInputFormat() ) to RichInputFormat* > * NOTE: the best option from a "semantic" point of view would be to rename > the current open() and close() to openSplit() and closeSplit() respectively > while using open() and close() methods for the IF lifecycle management, but > this would cause a backward compatibility issue... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3777) Add open and close methods to manage IF lifecycle
[ https://issues.apache.org/jira/browse/FLINK-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254549#comment-15254549 ] ASF GitHub Bot commented on FLINK-3777: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1903#discussion_r60794593 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java --- @@ -82,9 +82,6 @@ public void initializeOnMaster(ClassLoader loader) throws Exception { catch (Throwable t) { throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t); } - finally { --- End diff -- Is this change intended? > Add open and close methods to manage IF lifecycle > - > > Key: FLINK-3777 > URL: https://issues.apache.org/jira/browse/FLINK-3777 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.1 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: inputformat, lifecycle > > At the moment the opening and closing of an inputFormat are not managed, > although open() could be (improperly IMHO) simulated by configure(). > This limits the possibility to reuse expensive resources (like database > connections) and manage their release. > Probably the best option would be to add 2 methods (i.e. openInputformat() > and closeInputFormat() ) to RichInputFormat* > * NOTE: the best option from a "semantic" point of view would be to rename > the current open() and close() to openSplit() and closeSplit() respectively > while using open() and close() methods for the IF lifecycle management, but > this would cause a backward compatibility issue... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3777] Managed closeInputFormat()
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1903#discussion_r60794593 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java --- @@ -82,9 +82,6 @@ public void initializeOnMaster(ClassLoader loader) throws Exception { catch (Throwable t) { throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t); } - finally { --- End diff -- Is this change intended? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254544#comment-15254544 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-213567820 Thanks for the additional information @yjshen! I'm a bit behind with PR reviews, but will definitely have a look begin of next week. Thanks, Fabian > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-213567820 Thanks for the additional information @yjshen! I'm a bit behind with PR reviews, but will definitely have a look begin of next week. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254535#comment-15254535 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1926#issuecomment-213565509 Thanks for opening this PR @dawidwys! I skimmed over the changes and it looks good. I will do a more detailed review hopefully soon. Thanks also for adding a lot of tests. It is definitely good to have extensive test coverage, but end-to-end tests such as yours add quite a bit to the build time. Flink uses Travis as CI service which kills builds after 2h. Unfortunately, we are experiencing build time outs already and have to be careful when adding tests. I would like to ask you to remove some of the tests which check for different expression syntax but end up in identical executions. In addition, it would be good to add one SQL test that executes a query which sorts on two fields to have the SQL part covered. We will add a unit test framework, that checks for correct parsing of the expressions without actually executing queries in a separate effort. > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1926#issuecomment-213565509 Thanks for opening this PR @dawidwys! I skimmed over the changes and it looks good. I will do a more detailed review hopefully soon. Thanks also for adding a lot of tests. It is definitely good to have extensive test coverage, but end-to-end tests such as yours add quite a bit to the build time. Flink uses Travis as CI service which kills builds after 2h. Unfortunately, we are experiencing build time outs already and have to be careful when adding tests. I would like to ask you to remove some of the tests which check for different expression syntax but end up in identical executions. In addition, it would be good to add one SQL test that executes a query which sorts on two fields to have the SQL part covered. We will add a unit test framework, that checks for correct parsing of the expressions without actually executing queries in a separate effort. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254352#comment-15254352 ] ASF GitHub Bot commented on FLINK-2946: --- GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/1926 [FLINK-2946] Add orderBy() to Table API Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink tableSort Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1926 > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/1926 [FLINK-2946] Add orderBy() to Table API Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink tableSort Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1926 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254332#comment-15254332 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213530765 Hi, Cool, if you have time to adress them, go ahead :) Thanks a lot for doing this by the way! I really like the work you did so far on the connector! Sent from my iPhone > On 22.04.2016, at 18:01, Tzu-Li Tai wrote: > > @rmetzger > Thank you very much for your detailed review on the PR :) > I've replied to the comments you added, please . > I can address the issues and follow up with corresponding commits within the next 36 hours. I am pretty much free for the next 3 days, and will very much like to get the consumer ready for merging by the end of this week :) > > If it still isn't ready by the end of 4/25, I'm afraid I will have to leave any remaining issues for you to address since after then I temporarily won't be able to work on code until June. > > — > You are receiving this because you were mentioned. > Reply to this email directly or view it on GitHub > > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213530765 Hi, Cool, if you have time to adress them, go ahead :) Thanks a lot for doing this by the way! I really like the work you did so far on the connector! Sent from my iPhone > On 22.04.2016, at 18:01, Tzu-Li Tai wrote: > > @rmetzger > Thank you very much for your detailed review on the PR :) > I've replied to the comments you added, please . > I can address the issues and follow up with corresponding commits within the next 36 hours. I am pretty much free for the next 3 days, and will very much like to get the consumer ready for merging by the end of this week :) > > If it still isn't ready by the end of 4/25, I'm afraid I will have to leave any remaining issues for you to address since after then I temporarily won't be able to work on code until June. > > â > You are receiving this because you were mentioned. > Reply to this email directly or view it on GitHub > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3806) Revert use of DataSet.count() in Gelly
Greg Hogan created FLINK-3806: - Summary: Revert use of DataSet.count() in Gelly Key: FLINK-3806 URL: https://issues.apache.org/jira/browse/FLINK-3806 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Priority: Critical FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The former returns a {{DataSet}} while the latter executes a job to return a Java value. {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the user does not pass the number of vertices as a parameter. As noted in FLINK-1632, this does make the code simpler but if my understanding is correct will materialize the Graph twice. The Graph will need to be reread from input, regenerated, or recomputed by preceding algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2715) Benchmark Triangle Count methods
[ https://issues.apache.org/jira/browse/FLINK-2715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254230#comment-15254230 ] Greg Hogan commented on FLINK-2715: --- The performance of {{TriangleEnumerator}} was considerably worse until the recent fixes in FLINK-3770. This algorithm could also be updated to initially order edges by lower degree rather than higher ID. It should also run faster with the upcoming hashing combiner. The use of {{TreeMap}} likely limits the performance relative to {{TriangleEnumerator}}. Implementation of the Global Clustering Coefficient requires the triangle count and I've been working on what I think will be a nice way to capture algorithm metrics without duplicating code. The Flink bug has been filed as FLINK-3805. > Benchmark Triangle Count methods > > > Key: FLINK-2715 > URL: https://issues.apache.org/jira/browse/FLINK-2715 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Priority: Minor > Labels: starter > > Once FLINK-2714 is addressed, it would be nice to have a set of benchmarks > that test the efficiency of the DataSet, GSA and vertex-centric versions. > This means running the three examples on a cluster environment using various > graph DataSets. For instance, SNAP's Orkut and Friendster networks > (https://snap.stanford.edu/data/). > The results produced by the experiments should then be reported in the Gelly > docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60770825 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java --- @@ -0,0 +1,51 @@ +package org.apache.flink.streaming.connectors.kinesis.config; + +/** + * + */ +public class KinesisConfigConstants { + + // + // Configuration Keys + // + + /** The max retries to retrieve metadata from a Kinesis stream using describeStream API +* (Note: describeStream attempts may be temporarily blocked due to AWS capping 5 attempts per sec) */ + public static final String CONFIG_STREAM_DESCRIBE_RETRIES = "flink.stream.describe.retry"; + + /** The backoff time between each describeStream attempt */ + public static final String CONFIG_STREAM_DESCRIBE_BACKOFF = "flink.stream.describe.backoff"; + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String CONFIG_STREAM_INIT_POSITION_TYPE = "flink.stream.initpos.type"; + + /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = "aws.credentials.provider"; + + /** The AWS access key ID to use when setting credentials provider type to BASIC */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = "aws.credentials.provider.basic.accesskeyid"; + + /** The AWS secret key to use when setting credentials provider type to BASIC */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = "aws.credentials.provider.basic.secretkey"; + + /** Optional configuration for profile path if credential provider type is set to be PROFILE */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH = "aws.credentials.provider.profile.path"; + + /** Optional configuration for profile name if credential provider type is set to be PROFILE */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME = "aws.credentials.provider.profile.name"; + + /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */ + public static final String CONFIG_AWS_REGION = "aws.region"; + + + // + // Default configuration values + // + + public static final String DEFAULT_AWS_REGION = "us-east-1"; --- End diff -- I think its reasonable to make region a required argument. As a user, more than once I've found myself mistakened for the AWS SDK not correctly finding resources, only realizing that it is defaulting to another region unless specified. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254221#comment-15254221 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60770825 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java --- @@ -0,0 +1,51 @@ +package org.apache.flink.streaming.connectors.kinesis.config; + +/** + * + */ +public class KinesisConfigConstants { + + // + // Configuration Keys + // + + /** The max retries to retrieve metadata from a Kinesis stream using describeStream API +* (Note: describeStream attempts may be temporarily blocked due to AWS capping 5 attempts per sec) */ + public static final String CONFIG_STREAM_DESCRIBE_RETRIES = "flink.stream.describe.retry"; + + /** The backoff time between each describeStream attempt */ + public static final String CONFIG_STREAM_DESCRIBE_BACKOFF = "flink.stream.describe.backoff"; + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String CONFIG_STREAM_INIT_POSITION_TYPE = "flink.stream.initpos.type"; + + /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = "aws.credentials.provider"; + + /** The AWS access key ID to use when setting credentials provider type to BASIC */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = "aws.credentials.provider.basic.accesskeyid"; + + /** The AWS secret key to use when setting credentials provider type to BASIC */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = "aws.credentials.provider.basic.secretkey"; + + /** Optional configuration for profile path if credential provider type is set to be PROFILE */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH = "aws.credentials.provider.profile.path"; + + /** Optional configuration for profile name if credential provider type is set to be PROFILE */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME = "aws.credentials.provider.profile.name"; + + /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */ + public static final String CONFIG_AWS_REGION = "aws.region"; + + + // + // Default configuration values + // + + public static final String DEFAULT_AWS_REGION = "us-east-1"; --- End diff -- I think its reasonable to make region a required argument. As a user, more than once I've found myself mistakened for the AWS SDK not correctly finding resources, only realizing that it is defaulting to another region unless specified. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v
[jira] [Comment Edited] (FLINK-1284) Uniform random sampling operator over windows
[ https://issues.apache.org/jira/browse/FLINK-1284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253383#comment-15253383 ] Austin Ouyang edited comment on FLINK-1284 at 4/22/16 4:40 PM: --- Hi [~senorcarbone], Would we also want to add the ability to sample by percentage? Also what would the fieldID be referring to? I was thinking that there were 2 naive possible solutions. 1) Once the trigger is made, we randomly sample for N samples or a percentage of all the samples in each window 2) Given a percentage of samples we want to retain from each window generate a random number between 0 and 1. Append to result if the random number is less than the specified percentage. I'd be happy to try working on this as well! was (Author: aouyang1): Hi Paris, Would we also want to add the ability to sample by percentage? Also what would the fieldID be referring to? I was thinking that there were 2 naive possible solutions. 1) Once the trigger is made, we randomly sample for N samples or a percentage of all the samples in each window 2) Given a percentage of samples we want to retain from each window generate a random number between 0 and 1. Append to result if the random number is less than the specified percentage. > Uniform random sampling operator over windows > - > > Key: FLINK-1284 > URL: https://issues.apache.org/jira/browse/FLINK-1284 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Priority: Minor > > It would be useful for several use cases to have a built-in uniform random > sampling operator in the streaming API that can operate on windows. This can > be used for example for online machine learning operations, evaluating > heuristics or continuous visualisation of representative values. > The operator could be given a field and a number of random samples needed, > following a window statement as such: > mystream.window(..).sample(fieldID,#samples) > Given that pre-aggregation is enabled, this could perhaps be implemented as a > binary reduce operator or a combinable groupreduce that pre-aggregates the > empiricals of that field. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254132#comment-15254132 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213491263 @rmetzger Thank you very much for your detailed review on the PR :) I've replied to the comments you added, please . I can address the issues and follow up with corresponding commits within the next 36 hours. I am pretty much free for the next 3 days, and will very much like to get the consumer ready for merging by the end of this week :) If it still isn't ready by the end of 4/25, I'm afraid I will have to leave any remaining issues for you to address since after then I temporarily won't be able to work on code until June. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213491263 @rmetzger Thank you very much for your detailed review on the PR :) I've replied to the comments you added, please . I can address the issues and follow up with corresponding commits within the next 36 hours. I am pretty much free for the next 3 days, and will very much like to get the consumer ready for merging by the end of this week :) If it still isn't ready by the end of 4/25, I'm afraid I will have to leave any remaining issues for you to address since after then I temporarily won't be able to work on code until June. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3805) BloomFilter initialized with no memory available
Greg Hogan created FLINK-3805: - Summary: BloomFilter initialized with no memory available Key: FLINK-3805 URL: https://issues.apache.org/jira/browse/FLINK-3805 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.1, 1.1.0 Reporter: Greg Hogan Priority: Critical I flagged this as 1.1.0 and 1.0.1 without checking the latter. Link to build, command, and stacktrace follow. {{MutableHashTable.initTable}} is calling {{initBloomFilter}} when {{this.availableMemory.size()==0}}. https://s3.amazonaws.com/apache-flink/flink_bloomfilter_crash.tar.bz2 ./bin/flink run -class org.apache.flink.graph.examples.TriangleListing ~/flink-gelly-examples_2.10-1.1-SNAPSHOT.jar --clip_and_flip false --output print --output print --scale 14 --count {code} org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) at org.apache.flink.graph.examples.TriangleListing.main(TriangleListing.java:106) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:805) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: expectedEntries should be > 0 at org.apache.flink.shaded.com.google.common.base.Preconditions.checkArgument(Preconditions.java:88) at org.apache.flink.runtime.operators.util.BloomFilter.(BloomFilter.java:53) at org.apache.flink.runtime.operators.hash.MutableHashTable.initBloomFilter(MutableHashTable.java:823) at org.apache.flink.runtime.operators.hash.MutableHashTable.initTable(MutableHashTable.java:1183) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:887) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116) at org.apache.flink.runtime
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254110#comment-15254110 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60760294 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMap assignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a shard's starting
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60760294 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMap assignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a shard's starting sequence number to a specified value +* +* @param streamShard the shard to perform the advance on +* @param sequenceNum the sequence number to advance to +*/ + public void advanceSequenceNumberTo(KinesisStreamShard st
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254107#comment-15254107 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60760085 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer p
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60760085 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer properties + // + + /** The complete list of shards */ + private final List shards; + + /** Properties to parametrize settings such as AWS service regi
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254106#comment-15254106 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60759925 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information + * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to + * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. + */ +public class KinesisStreamShard implements Serializable { --- End diff -- The main reason for why the consumer needs to have another Shard representation is because `com.amazonaws.services.kinesis.model.Shard` doesn't have the shard's associated stream name as a field. We will need the stream name when getting a shard iterator for a particular shard, using `com.amazonaws.services.kinesis.AmazonKinesisClient#getShardIterator(streamName, shardId, iteratorType)`. Moreover, since the consumer's implementation supports reading from multiple Kinesis streams, we must carry the associated stream name along with each Shard representation (I guess that's the reason why Amazon's Shard implementation doesn't have a field for stream name). Our implementation, `org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard`, currently has `regionName` (I don't think we need this actually, since the consumer is limited to read from Kinesis streams within the same region) and `streamName` as fields besides the already supplied ones in Amazon's Shard. So, what we could do to reduce duplicate implementation is to include Amazon's Shard implementation as a field within our `KinesisStreamShard`, and let the `KinesisStreamShard` still have `streamName` as an extra field. How do you think? > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60759925 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information + * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to + * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. + */ +public class KinesisStreamShard implements Serializable { --- End diff -- The main reason for why the consumer needs to have another Shard representation is because `com.amazonaws.services.kinesis.model.Shard` doesn't have the shard's associated stream name as a field. We will need the stream name when getting a shard iterator for a particular shard, using `com.amazonaws.services.kinesis.AmazonKinesisClient#getShardIterator(streamName, shardId, iteratorType)`. Moreover, since the consumer's implementation supports reading from multiple Kinesis streams, we must carry the associated stream name along with each Shard representation (I guess that's the reason why Amazon's Shard implementation doesn't have a field for stream name). Our implementation, `org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard`, currently has `regionName` (I don't think we need this actually, since the consumer is limited to read from Kinesis streams within the same region) and `streamName` as fields besides the already supplied ones in Amazon's Shard. So, what we could do to reduce duplicate implementation is to include Amazon's Shard implementation as a field within our `KinesisStreamShard`, and let the `KinesisStreamShard` still have `streamName` as an extra field. How do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254090#comment-15254090 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60756732 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer p
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60756732 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer properties + // + + /** The complete list of shards */ + private final List shards; + + /** Properties to parametrize settings such as AWS service regi
[jira] [Commented] (FLINK-2715) Benchmark Triangle Count methods
[ https://issues.apache.org/jira/browse/FLINK-2715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254084#comment-15254084 ] Vasia Kalavri commented on FLINK-2715: -- Hi [~greghogan], Thanks a lot for reporting your results! What was the reason for {{GSATriangleCount}} crash? {{GSATriangleCount}} was added to the library without proper evaluation. Since {{TriangleEnumerator}} performs well and provides the same functionality, I would be in favor of removing {{GSATriangleCount}} completely. I think we should keep the library small and try to provide as efficient implementations as possible. What do you think? > Benchmark Triangle Count methods > > > Key: FLINK-2715 > URL: https://issues.apache.org/jira/browse/FLINK-2715 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Priority: Minor > Labels: starter > > Once FLINK-2714 is addressed, it would be nice to have a set of benchmarks > that test the efficiency of the DataSet, GSA and vertex-centric versions. > This means running the three examples on a cluster environment using various > graph DataSets. For instance, SNAP's Orkut and Friendster networks > (https://snap.stanford.edu/data/). > The results produced by the experiments should then be reported in the Gelly > docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60755162 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer properties + // + + /** The complete list of shards */ + private final List shards; + + /** Properties to parametrize settings such as AWS service regi
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254080#comment-15254080 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60755162 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer p
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60753879 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** +* Create a new KinesisProxy based on the supplied configuration properties +* +* @param configProps configuration properties containing AWS credential and AWS region info +*/ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); + + this.kinesisClient = client; + } + + /** +* Get the next batch of data records using a specific shard iterator +* +* @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading +* @param maxRecordsToGet the maximum amount of records to retrieve for this batch +* @return the batch of retrieved records +*/ + public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { + final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); + getRecordsRequest.setShardIterator(shardIterator); + getRecordsRequest.setLimit(maxRecordsToGet); + + GetRecordsResult getRecordsResult = null; + + int remainingRetryTimes = Integer.valueOf( + configProps.getProperty(KinesisConfigConstants.CONFIG_ST
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254068#comment-15254068 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60753879 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** +* Create a new KinesisProxy based on the supplied configuration properties +* +* @param configProps configuration properties containing AWS credential and AWS region info +*/ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); + + this.kinesisClient = client; + } + + /** +* Get the next batch of data records using a specific shard iterator +* +* @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading +* @param maxRecordsToGet the maximum amount of records to retrieve for this batch +* @return the batch of retrieved records +*/ + public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { + final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); + getRecordsRequest.setShardIterator(shardIterator); +
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254063#comment-15254063 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213465308 @rmetzger Regarding the need to upgrade Jackson to get the example working: After updating the code for Flink 1.0.x to prepare for PR, I only tested the consumer with manual tests within the Flink project code. However, when the consumer was first implemented when Flink was 0.10.1, at the time I did package the consumer and used it as a separate dependency. I'm wondering would it have anything to do with changes between older & newer Flink version? > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213465308 @rmetzger Regarding the need to upgrade Jackson to get the example working: After updating the code for Flink 1.0.x to prepare for PR, I only tested the consumer with manual tests within the Flink project code. However, when the consumer was first implemented when Flink was 0.10.1, at the time I did package the consumer and used it as a separate dependency. I'm wondering would it have anything to do with changes between older & newer Flink version? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254055#comment-15254055 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60752259 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMap assignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a shard's starting
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60752259 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMap assignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a shard's starting sequence number to a specified value +* +* @param streamShard the shard to perform the advance on +* @param sequenceNum the sequence number to advance to +*/ + public void advanceSequenceNumberTo(KinesisStreamShard st
[jira] [Commented] (FLINK-2715) Benchmark Triangle Count methods
[ https://issues.apache.org/jira/browse/FLINK-2715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254051#comment-15254051 ] Greg Hogan commented on FLINK-2715: --- I happened to include some triangle benchmarks in the pull request for FLINK-3768. Adding a flag for {{GSATriangleCount}} was simple. Testing on an AWS ec2.8xlarge (36 vcores, 60 GiB), what took {{TriangleListing}} 5s and {{TriangleEnumerator}} 7s crashed Flink at 6m3s with {{GSATriangleCount}}. Orkut and Friendster are 100x and 2000x larger than RMat s16e16 so running these benchmarks with GSA or SG does not look feasible with the current algorithms. I do agree that for overlapping algorithms in Gelly it would be nice to present users a performance comparison. Thoughts, [~vkalavri]? > Benchmark Triangle Count methods > > > Key: FLINK-2715 > URL: https://issues.apache.org/jira/browse/FLINK-2715 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Priority: Minor > Labels: starter > > Once FLINK-2714 is addressed, it would be nice to have a set of benchmarks > that test the efficiency of the DataSet, GSA and vertex-centric versions. > This means running the three examples on a cluster environment using various > graph DataSets. For instance, SNAP's Orkut and Friendster networks > (https://snap.stanford.edu/data/). > The results produced by the experiments should then be reported in the Gelly > docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254004#comment-15254004 ] ASF GitHub Bot commented on FLINK-3708: --- Github user StefanRRichter commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-213443445 The current version should cover all of your feedback and could be pulled @tillrohrmann . > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user StefanRRichter commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-213443445 The current version should cover all of your feedback and could be pulled @tillrohrmann . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1898#issuecomment-213434810 Thanks for your contribution @rawkintrevo. Good work. I had some minor inline comments. I'm mainly concerned about the efficiency of `multiRandomSplit` because it can construct some really long pipelines. I think we should also add online documentation for the `Splitter`. Otherwise people will just miss it. You can take a look at `docs/libs/ml/` and create a web page for the splitter. We could then create a site with tools from where we link to the `Splitter`, for example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253954#comment-15253954 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1898#issuecomment-213434810 Thanks for your contribution @rawkintrevo. Good work. I had some minor inline comments. I'm mainly concerned about the efficiency of `multiRandomSplit` because it can construct some really long pipelines. I think we should also add online documentation for the `Splitter`. Otherwise people will just miss it. You can take a look at `docs/libs/ml/` and create a web page for the splitter. We could then create a site with tools from where we link to the `Splitter`, for example. > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is used to > evaluate the performance of the model, and assess its generalization [1] > This is a common approach when training Artificial Neural Networks, and a > good strategy to choose in data-rich environments. Therefore we should have > some support of this data-analysis process in our Estimators. > [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of > statistical learning. Vol. 1. Springer, Berlin: Springer series in > statistics, 2001. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739883 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) + + + splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0) + } + + it should "result in datasets of an expected size when precise" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet, 0.5) + +val expectedLength = dataSet.count().toDouble * 0.5 + +splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0) + } + + it should "result in expected number of datasets" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val fracArray = Array(0.5, 0.25, 0.25) + +val splitDataSets = Splitter.multiRandomSplit(dataSet, fracArray) + +splitDataSets.length should equal(fracArray.length) + } + --- End diff -- Maybe we could add a test case for sampling with replacement? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253944#comment-15253944 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739883 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) + + + splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0) + } + + it should "result in datasets of an expected size when precise" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet, 0.5) + +val expectedLength = dataSet.count().toDouble * 0.5 + +splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0) + } + + it should "result in expected number of datasets" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val fracArray = Array(0.5, 0.25, 0.25) + +val splitDataSets = Splitter.multiRandomSplit(dataSet, fracArray) + +splitDataSets.length should equal(fracArray.length) + } + --- End diff -- Maybe we could add a test case for sampling with replacement? > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is used to > evaluate the performance of the model, and assess its generalization [1] > This is a common approach when training Artificial Neural Networks, and a > good strategy to choose in data-rich environments. Therefore we should have > some support of this data-analysis process in our Estimators. > [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of > statistical learning. Vol. 1. Springer, Berlin: Springer series in > statistics, 2001. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739818 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) --- End diff -- What happens if fraction is larger than `1` and `withReplacement` is set to `false`? Shouldn't it be set to `true` in this case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253943#comment-15253943 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739818 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) --- End diff -- What happens if fraction is larger than `1` and `withReplacement` is set to `false`? Shouldn't it be set to `true` in this case? > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train,
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253937#comment-15253937 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739509 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) + + + splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0) + } + + it should "result in datasets of an expected size when precise" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet, 0.5) --- End diff -- Why don't we use the precise sampling here? > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is used to > evaluate the performance of the model, and assess its generalization [1] > This is a common approach when training Artificial Neural Networks, and a > good strategy to choose in data-rich environments. Therefore we should have > some support of this data-analysis process in our Estimators. > [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of > statistical learning. Vol. 1. Springer, Berlin: Springer series in > statistics, 2001. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739509 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) + + + splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0) + } + + it should "result in datasets of an expected size when precise" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet, 0.5) --- End diff -- Why don't we use the precise sampling here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253936#comment-15253936 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739425 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) + + + splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0) + } + + it should "result in datasets of an expected size when precise" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet, 0.5) + +val expectedLength = dataSet.count().toDouble * 0.5 --- End diff -- The size of `dataSet` we can calculate without executing a job. Simply `data.size` > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is used to > evaluate the performance of the model, and assess its generalization [1] > This is a common approach when training Artificial Neural Networks, and a > good strategy to choose in data-rich environments. Therefore we should have > some support of this data-analysis process in our Estimators. > [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of > statistical learning. Vol. 1. Springer, Berlin: Springer series in > statistics, 2001. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739425 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) + + + splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0) + } + + it should "result in datasets of an expected size when precise" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet, 0.5) + +val expectedLength = dataSet.count().toDouble * 0.5 --- End diff -- The size of `dataSet` we can calculate without executing a job. Simply `data.size` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253935#comment-15253935 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739344 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) + + + splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0) + } + + it should "result in datasets of an expected size when precise" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet, 0.5) + +val expectedLength = dataSet.count().toDouble * 0.5 + +splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0) --- End diff -- Does this mean that the test case could fail? Even if its unlikely? > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is used to > evaluate the performance of the model, and assess its generalization [1] > This is a common approach when training Artificial Neural Networks, and a > good strategy to choose in data-rich environments. Therefore we should have > some support of this data-analysis process in our Estimators. > [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of > statistical learning. Vol. 1. Springer, Berlin: Springer series in > statistics, 2001. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739344 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) + + + splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0) + } + + it should "result in datasets of an expected size when precise" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet, 0.5) + +val expectedLength = dataSet.count().toDouble * 0.5 + +splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0) --- End diff -- Does this mean that the test case could fail? Even if its unlikely? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253932#comment-15253932 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739128 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) --- End diff -- Maybe we could `union` `splitDataSets(0)` and `splitDataSet(1)` and then join them with `dataSet` and then count the whole thing. Then we could avoid two executions because every `count` will trigger the execution of the whole pipeline. > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is used to > evaluate the performance of the model, and assess its generalization [1] > This is a common approach when training Artificial Neural Networks, and a > good strategy to choose in data-rich environments. Therefore we should have > some support of this data-analysis process in our Estimators. > [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of > statistical learning. Vol. 1. Springer, Berlin: Springer series in > statistics, 2001. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60739128 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) + + (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count()) --- End diff -- Maybe we could `union` `splitDataSets(0)` and `splitDataSet(1)` and then join them with `dataSet` and then count the whole thing. Then we could avoid two executions because every `count` will trigger the execution of the whole pipeline. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60738935 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) --- End diff -- `zipWithUniqueId` should be fine here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253927#comment-15253927 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60738935 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.ml.math.Vector +import org.apache.flink.api.scala.utils._ + + +class SplitterITSuite extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's DataSet Splitter" + + import MinMaxScalerData._ + + it should "result in datasets with no elements in common and all elements used" in { +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) + +val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5) --- End diff -- `zipWithUniqueId` should be fine here > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is used to > evaluate the performance of the model, and assess its generalization [1] > This is a common approach when training Artificial Neural Networks, and a > good strategy to choose in data-rich environments. Therefore we should have > some support of this data-analysis process in our Estimators. > [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of > statistical learning. Vol. 1. Springer, Berlin: Springer series in > statistics, 2001. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253923#comment-15253923 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60738627 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // ---
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60738627 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // + /** + * Split a DataSet by the probability fraction of each element of a vector. + * + * @param input DataSet to be split + * @param fracArray An array of PROPORTIONS for splitting the DataSet. Unlike th
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60738393 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // + /** + * Split a DataSet by the probability fraction of each element of a vector. + * + * @param input DataSet to be split + * @param fracArray An array of PROPORTIONS for splitting the DataSet. Unlike th
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253920#comment-15253920 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60738393 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // ---
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60738251 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // + /** + * Split a DataSet by the probability fraction of each element of a vector. + * + * @param input DataSet to be split + * @param fracArray An array of PROPORTIONS for splitting the DataSet. Unlike th
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253917#comment-15253917 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60738251 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253913#comment-15253913 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60737984 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // ---
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60737947 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // + /** + * Split a DataSet by the probability fraction of each element of a vector. + * + * @param input DataSet to be split + * @param fracArray An array of PROPORTIONS for splitting the DataSet. Unlike th
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60737984 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // + /** + * Split a DataSet by the probability fraction of each element of a vector. + * + * @param input DataSet to be split + * @param fracArray An array of PROPORTIONS for splitting the DataSet. Unlike th
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253911#comment-15253911 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60737947 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // ---
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60736953 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // + /** + * Split a DataSet by the probability fraction of each element of a vector. + * + * @param input DataSet to be split + * @param fracArray An array of PROPORTIONS for splitting the DataSet. Unlike th
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253903#comment-15253903 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60736953 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // ---
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253899#comment-15253899 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213425285 Overall, I really like the work you did for the Kinesis Consumer! As you can see I've added some comments on the PR. Please let me know what's you opinion on my comments. When you do you think you'll have time to address the issues? I would like to get the code merged as soon as possible because Amazon is asking for having it in Flink soon. If you know already that you won't have time for working on this in the upcoming days, I can also address the comments. Just let me know, I think we can find a solution. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213425285 Overall, I really like the work you did for the Kinesis Consumer! As you can see I've added some comments on the PR. Please let me know what's you opinion on my comments. When you do you think you'll have time to address the issues? I would like to get the code merged as soon as possible because Amazon is asking for having it in Flink soon. If you know already that you won't have time for working on this in the upcoming days, I can also address the comments. Just let me know, I think we can find a solution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3618) Rename abstract UDF classes in Scatter-Gather implementation
[ https://issues.apache.org/jira/browse/FLINK-3618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253888#comment-15253888 ] Greg Hogan commented on FLINK-3618: --- This looks to be a simple change with no dissension, but is unassigned. Any reason not to make this change for 1.1.0? > Rename abstract UDF classes in Scatter-Gather implementation > > > Key: FLINK-3618 > URL: https://issues.apache.org/jira/browse/FLINK-3618 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0, 1.0.1 >Reporter: Martin Junghanns >Priority: Minor > > We now offer three Vertex-centric computing abstractions: > * Pregel > * Gather-Sum-Apply > * Scatter-Gather > Each of these abstractions provides abstract classes that need to be > implemented by the user: > * Pregel: {{ComputeFunction}} > * GSA: {{GatherFunction}}, {{SumFunction}}, {{ApplyFunction}} > * Scatter-Gather: {{MessagingFunction}}, {{VertexUpdateFunction}} > In Pregel and GSA, the names of those functions follow the name of the > abstraction or the name suggested in the corresponding papers. For > consistency of the API, I propose to rename {{MessageFunction}} to > {{ScatterFunction}} and {{VertexUpdateFunction}} to {{GatherFunction}}. > Also for consistency, I would like to change the parameter order in > {{Graph.runScatterGatherIteration(VertexUpdateFunction f1, MessagingFunction > f2}} to {{Graph.runScatterGatherIteration(ScatterFunction f1, GatherFunction > f2}} (like in {{Graph.runGatherSumApplyFunction(...)}}) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253889#comment-15253889 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60735615 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // ---
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60735615 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) +Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2)) + } + + // + // multiRandomSplit + // + /** + * Split a DataSet by the probability fraction of each element of a vector. + * + * @param input DataSet to be split + * @param fracArray An array of PROPORTIONS for splitting the DataSet. Unlike th
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253886#comment-15253886 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60735451 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) --- End diff -- We could write this a bit more efficiently: ``` val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) .where(0) .equalTo(0).apply { (full: (Long,T) , left: (Long, T), collector: Collector[T]) => if (left
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60735451 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) + case true => { +val count = indexedInput.count() +val numOfSamples = math.round(fraction * count).toInt +indexedInput.sampleWithSize(false, numOfSamples, seed) + } +} + +val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) + .where(0) + .equalTo(0) { +(full: (Long,T) , left: (Long, T)) => (if (left == null) full else null) + } + .filter( o => o != null ) --- End diff -- We could write this a bit more efficiently: ``` val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, T)](leftSplit) .where(0) .equalTo(0).apply { (full: (Long,T) , left: (Long, T), collector: Collector[T]) => if (left == null) { collector.collect(full._2) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253874#comment-15253874 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-213419043 The type annotation work is done from bottom to top: Firstly, we know each schema of the two input, and we know `List[] expression` in `Project` are used to manipulate one row of table data as input and output one value per expression, therefore, we can infer the the output schema of `Project` (in the current impl this was expressed as: `def output: Seq[Attribute]`) if we know each expressions `dataType`. For example, `Add`'s dataType is same as it's input, `Or`'s dataType is always `Boolean`, `pow(a, b)`'s dataType is always `Double`, however, if and only if we understand all kinds of expressions, we are able to infer its `dataType`. The main problems here is we only have `Call`(Unresolved Function) generated during expression construction, therefore, we should resolve them first into solid `Expression`s. `FunctionCatalog` is introduced here for a mapping from `FunctionName -> Expression`, we can easily finish the translation work as we look up `catalog`. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-213419043 The type annotation work is done from bottom to top: Firstly, we know each schema of the two input, and we know `List[] expression` in `Project` are used to manipulate one row of table data as input and output one value per expression, therefore, we can infer the the output schema of `Project` (in the current impl this was expressed as: `def output: Seq[Attribute]`) if we know each expressions `dataType`. For example, `Add`'s dataType is same as it's input, `Or`'s dataType is always `Boolean`, `pow(a, b)`'s dataType is always `Double`, however, if and only if we understand all kinds of expressions, we are able to infer its `dataType`. The main problems here is we only have `Call`(Unresolved Function) generated during expression construction, therefore, we should resolve them first into solid `Expression`s. `FunctionCatalog` is introduced here for a mapping from `FunctionName -> Expression`, we can easily finish the translation work as we look up `catalog`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253872#comment-15253872 ] ASF GitHub Bot commented on FLINK-2259: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60733883 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex --- End diff -- it should be sufficient to use `zipWithUniqueId` here. It is more efficient than `zipWithIndex` > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is u
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r60733883 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex --- End diff -- it should be sufficient to use `zipWithUniqueId` here. It is more efficient than `zipWithIndex` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253871#comment-15253871 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60733759 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer p
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60733759 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer properties + // + + /** The complete list of shards */ + private final List shards; + + /** Properties to parametrize settings such as AWS service regi
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253870#comment-15253870 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60733595 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { --- End diff -- I don't think we need to implement the `CheckpointListener` currently because we are not committing the offsets anywhere. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60733595 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { --- End diff -- I don't think we need to implement the `CheckpointListener` currently because we are not committing the offsets anywhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---