[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/966#issuecomment-130979827 Sure. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats
[ https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696519#comment-14696519 ] ASF GitHub Bot commented on FLINK-1819: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/966#issuecomment-130979827 Sure. :) > Allow access to RuntimeContext from Input and OutputFormats > --- > > Key: FLINK-1819 > URL: https://issues.apache.org/jira/browse/FLINK-1819 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 0.9, 0.8.1 >Reporter: Fabian Hueske >Assignee: Sachin Goel >Priority: Minor > Fix For: 0.10 > > > User function that extend a RichFunction can access a {{RuntimeContext}} > which gives the parallel id of the task and access to Accumulators and > BroadcastVariables. > Right now, Input and OutputFormats cannot access their {{RuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api
[ https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696375#comment-14696375 ] ASF GitHub Bot commented on FLINK-2480: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/991#discussion_r37046489 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + */ +public class PrintSinkFunctionTest extends RichSinkFunction { + + private Environment envForPrefixNull = new Environment() { + @Override + public JobID getJobID() { + return null; + } + + @Override + public JobVertexID getJobVertexId() { + return null; + } + + @Override + public ExecutionAttemptID getExecutionId() { + return null; + } + + @Override + public Configuration getTaskConfiguration() { + return null; + } + + @Override + public TaskManagerRuntimeInfo getTaskManagerInfo() { + return null; + } + + @Override + public Configuration getJobConfiguration() { + return null; + } + + @Override + public int getNumberOfSubtasks() { + return 0; + } + + @Override + public int getIndexInSubtaskGroup() { + return 0; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return null; + } + + @Override + public IOManager getIOManager() { + return null; + } + + @Override + public MemoryManager getMemoryManager() { + return null; + } + + @Override + public String getTaskName() { + return null; + }
[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/991#discussion_r37046489 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + */ +public class PrintSinkFunctionTest extends RichSinkFunction { + + private Environment envForPrefixNull = new Environment() { + @Override + public JobID getJobID() { + return null; + } + + @Override + public JobVertexID getJobVertexId() { + return null; + } + + @Override + public ExecutionAttemptID getExecutionId() { + return null; + } + + @Override + public Configuration getTaskConfiguration() { + return null; + } + + @Override + public TaskManagerRuntimeInfo getTaskManagerInfo() { + return null; + } + + @Override + public Configuration getJobConfiguration() { + return null; + } + + @Override + public int getNumberOfSubtasks() { + return 0; + } + + @Override + public int getIndexInSubtaskGroup() { + return 0; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return null; + } + + @Override + public IOManager getIOManager() { + return null; + } + + @Override + public MemoryManager getMemoryManager() { + return null; + } + + @Override + public String getTaskName() { + return null; + } + + @Override + public String getTaskNameWithSubtasks() { + return null; + } + + @Override + public ClassLoader getUserClassLoader() { +
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696369#comment-14696369 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r37046195 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.operators + +import java.util.{List => JavaList, Random} + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, After, Test} + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SampleITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private val RNG: Random = new Random + + private var result: JavaList[String] = null; + + @Before + def initiate { +ExecutionEnvironment.getExecutionEnvironment.setParallelism(5) + } + + @After + def after() = { +TestBaseUtils.containsResultAsText(result, getSourceStrings) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithoutReplacement { +verifySamplerWithFractionWithoutReplacement(0d) +verifySamplerWithFractionWithoutReplacement(0.2d) +verifySamplerWithFractionWithoutReplacement(1.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithReplacement { +verifySamplerWithFractionWithReplacement(0d) +verifySamplerWithFractionWithReplacement(0.2d) +verifySamplerWithFractionWithReplacement(1.0d) +verifySamplerWithFractionWithReplacement(2.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithoutReplacement { +verifySamplerWithFixedSizeWithoutReplacement(0) +verifySamplerWithFixedSizeWithoutReplacement(2) +verifySamplerWithFixedSizeWithoutReplacement(21) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithReplacement { +verifySamplerWithFixedSizeWithReplacement(0) +verifySamplerWithFixedSizeWithReplacement(2) +verifySamplerWithFixedSizeWithReplacement(21) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double) { +verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(false, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double) { +verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(true, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFraction(withReplacement: Boolean, fraction: Double, seed: Long) { +val ds = getSourceDataSet() +val sampled = ds.sample(withReplacement, fraction, seed) +result = sampled.collect.asJava --- End diff -- The validity of sample result is verified in after() method for each test. As the source data is very small, verify the fraction does not make much sense, so i didn't verify the fraction valid
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r37046195 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.operators + +import java.util.{List => JavaList, Random} + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, After, Test} + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SampleITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private val RNG: Random = new Random + + private var result: JavaList[String] = null; + + @Before + def initiate { +ExecutionEnvironment.getExecutionEnvironment.setParallelism(5) + } + + @After + def after() = { +TestBaseUtils.containsResultAsText(result, getSourceStrings) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithoutReplacement { +verifySamplerWithFractionWithoutReplacement(0d) +verifySamplerWithFractionWithoutReplacement(0.2d) +verifySamplerWithFractionWithoutReplacement(1.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithReplacement { +verifySamplerWithFractionWithReplacement(0d) +verifySamplerWithFractionWithReplacement(0.2d) +verifySamplerWithFractionWithReplacement(1.0d) +verifySamplerWithFractionWithReplacement(2.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithoutReplacement { +verifySamplerWithFixedSizeWithoutReplacement(0) +verifySamplerWithFixedSizeWithoutReplacement(2) +verifySamplerWithFixedSizeWithoutReplacement(21) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithReplacement { +verifySamplerWithFixedSizeWithReplacement(0) +verifySamplerWithFixedSizeWithReplacement(2) +verifySamplerWithFixedSizeWithReplacement(21) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double) { +verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(false, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double) { +verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(true, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFraction(withReplacement: Boolean, fraction: Double, seed: Long) { +val ds = getSourceDataSet() +val sampled = ds.sample(withReplacement, fraction, seed) +result = sampled.collect.asJava --- End diff -- The validity of sample result is verified in after() method for each test. As the source data is very small, verify the fraction does not make much sense, so i didn't verify the fraction validity here, but it got verified in RandomSamplerTest in Sampler level by the way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes
[jira] [Commented] (FLINK-2516) Remove unwanted log.isInfoEnabled check
[ https://issues.apache.org/jira/browse/FLINK-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696321#comment-14696321 ] ASF GitHub Bot commented on FLINK-2516: --- GitHub user ffbin opened a pull request: https://github.com/apache/flink/pull/1012 [FLINK-2516]Remove unwanted log.isInfoEnabled check The function has call log.info() at the head of it.So i think the check of log.isInfoEnabled after call log.info() is unwanted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ffbin/flink FLINK-2516 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1012 commit 2354d008021b93885b3183fd8becd67c52e18424 Author: ffbin <869218...@qq.com> Date: 2015-08-14T01:37:14Z [FLINK-2516]Remove unwanted log.isInfoEnabled check > Remove unwanted log.isInfoEnabled check > --- > > Key: FLINK-2516 > URL: https://issues.apache.org/jira/browse/FLINK-2516 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 0.8.1 >Reporter: fangfengbin >Assignee: fangfengbin >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2516]Remove unwanted log.isInfoEnabled ...
GitHub user ffbin opened a pull request: https://github.com/apache/flink/pull/1012 [FLINK-2516]Remove unwanted log.isInfoEnabled check The function has call log.info() at the head of it.So i think the check of log.isInfoEnabled after call log.info() is unwanted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ffbin/flink FLINK-2516 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1012 commit 2354d008021b93885b3183fd8becd67c52e18424 Author: ffbin <869218...@qq.com> Date: 2015-08-14T01:37:14Z [FLINK-2516]Remove unwanted log.isInfoEnabled check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2516) Remove unwanted log.isInfoEnabled check
fangfengbin created FLINK-2516: -- Summary: Remove unwanted log.isInfoEnabled check Key: FLINK-2516 URL: https://issues.apache.org/jira/browse/FLINK-2516 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696283#comment-14696283 ] ASF GitHub Bot commented on FLINK-2490: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37042816 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- Sorry for my English. :) > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37042816 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- Sorry for my English. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695941#comment-14695941 ] Sheetal Parade commented on FLINK-2314: --- I added split information in the checkpoint state. {code} if(splitNumber == checkpointedSplit){ if(currRecord < checkpointedRecord) { currRecord++; continue; } } {code} restore state and snapshot checkpoint change accordingly. {code} @Override public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { return currRecord+":"+ splitNumber; } @Override public void restoreState(String state){ String[] res = state.split(":"); checkpointedRecord = Long.valueOf(res[0]); checkpointedSplit = Integer.valueOf(res[1]); } {code} > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Sheetal Parade > Labels: easyfix, starter > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Stale Synchronous Parallel Iterations
Github user nltran commented on the pull request: https://github.com/apache/flink/pull/967#issuecomment-130734997 The code now passes all the checks :smiley: I have addressed the previous comments you made previously. @StephanEwen , @fhueske could you give it a another review? Namely: * The SSP slack configuration is now in TaskConfig instead of job-wide. At some point we might want to unify both iterations strategies since the BSP iteration mode is an edge case for SSP with the slack equal to zero. * The parameter server is now completely orthogonal to Flink core. It is up to the user to set it up and call it. I'm preparing a sample job that uses both SSP and calls to a parameter server. * Correct license header. Will I have to fill in a Contributor License Agreement at some point? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/991#discussion_r36990759 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + */ +public class PrintSinkFunctionTest extends RichSinkFunction { + + private Environment envForPrefixNull = new Environment() { + @Override + public JobID getJobID() { + return null; + } + + @Override + public JobVertexID getJobVertexId() { + return null; + } + + @Override + public ExecutionAttemptID getExecutionId() { + return null; + } + + @Override + public Configuration getTaskConfiguration() { + return null; + } + + @Override + public TaskManagerRuntimeInfo getTaskManagerInfo() { + return null; + } + + @Override + public Configuration getJobConfiguration() { + return null; + } + + @Override + public int getNumberOfSubtasks() { + return 0; + } + + @Override + public int getIndexInSubtaskGroup() { + return 0; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return null; + } + + @Override + public IOManager getIOManager() { + return null; + } + + @Override + public MemoryManager getMemoryManager() { + return null; + } + + @Override + public String getTaskName() { + return null; + } + + @Override + public String getTaskNameWithSubtasks() { + return null; + } + + @Override + public ClassLoader getUserClassLoader() { + return
[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/991#discussion_r36990804 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + */ +public class PrintSinkFunctionTest extends RichSinkFunction { + + private Environment envForPrefixNull = new Environment() { + @Override + public JobID getJobID() { + return null; + } + + @Override + public JobVertexID getJobVertexId() { + return null; + } + + @Override + public ExecutionAttemptID getExecutionId() { + return null; + } + + @Override + public Configuration getTaskConfiguration() { + return null; + } + + @Override + public TaskManagerRuntimeInfo getTaskManagerInfo() { + return null; + } + + @Override + public Configuration getJobConfiguration() { + return null; + } + + @Override + public int getNumberOfSubtasks() { + return 0; + } + + @Override + public int getIndexInSubtaskGroup() { + return 0; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return null; + } + + @Override + public IOManager getIOManager() { + return null; + } + + @Override + public MemoryManager getMemoryManager() { + return null; + } + + @Override + public String getTaskName() { + return null; + } + + @Override + public String getTaskNameWithSubtasks() { + return null; + } + + @Override + public ClassLoader getUserClassLoader() { + return
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695604#comment-14695604 ] Sheetal Parade commented on FLINK-2314: --- Can you provide some guidance? As I understand, the checkpoint information needs input split information too? > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Sheetal Parade > Labels: easyfix, starter > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Framesize fix
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/934#issuecomment-130700589 Hi @mxm , I totally agree that in terms of code clarity and structure, it would be better to let all accumulators pass through the BlobCache. In fact, we also had a brief discussion with @StephanEwen on the matter. The only problem is the latency penalty that MAY result from this design choice. In other words, if for most of the usages akka is enough, then forcing small blobs to pass through the blobCache (i.e. reading and writing accumulators to disk) may be expensive. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/966#issuecomment-130677380 Thanks for the contribution @sachingoel0101! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/966 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973377 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- Sorry, minor thing but it should be `retries`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973430 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- You should also initialize the variable with 0 in the `open()` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973279 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReference error = new AtomicReference(); + private final String host = "127.0.0.1"; + + SourceFunction.SourceContext ctx = new SourceFunction.SourceContext() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count < 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail("Error in spawned thread: " + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + source.start(); + + Socket channel; + channel = serverSo.accept(); + channel.close(); + serverSo.close(); +
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973254 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReference error = new AtomicReference(); + private final String host = "127.0.0.1"; + + SourceFunction.SourceContext ctx = new SourceFunction.SourceContext() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count < 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail("Error in spawned thread: " + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + source.start(); + + Socket channel; + channel = serverSo.accept(); + channel.close(); + serverSo.close(); +
[GitHub] flink pull request: [FLINK-2306] Add support for named streams in ...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1011#issuecomment-130694237 Yes. It a weird compile error and it builds locally... The error is: `[ERROR] /home/travis/build/mjsax/flink/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java:[27,64] package org.apache.flink.hadoop.shaded.com.google.common.collect does not exist` I have no clue what's wrong... :/ Any suggestions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2306] Add support for named streams in ...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1011#issuecomment-130692637 The CU reports 3 failures. 2 failures in storm-compatibility-core 1 failure in YARN (yarn not responding) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api
[ https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695412#comment-14695412 ] ASF GitHub Bot commented on FLINK-2480: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/991#discussion_r36990804 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + */ +public class PrintSinkFunctionTest extends RichSinkFunction { + + private Environment envForPrefixNull = new Environment() { + @Override + public JobID getJobID() { + return null; + } + + @Override + public JobVertexID getJobVertexId() { + return null; + } + + @Override + public ExecutionAttemptID getExecutionId() { + return null; + } + + @Override + public Configuration getTaskConfiguration() { + return null; + } + + @Override + public TaskManagerRuntimeInfo getTaskManagerInfo() { + return null; + } + + @Override + public Configuration getJobConfiguration() { + return null; + } + + @Override + public int getNumberOfSubtasks() { + return 0; + } + + @Override + public int getIndexInSubtaskGroup() { + return 0; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return null; + } + + @Override + public IOManager getIOManager() { + return null; + } + + @Override + public MemoryManager getMemoryManager() { + return null; + } + + @Override + public String getTaskName() { + return null; + } +
[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api
[ https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695410#comment-14695410 ] ASF GitHub Bot commented on FLINK-2480: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/991#discussion_r36990759 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + */ +public class PrintSinkFunctionTest extends RichSinkFunction { + + private Environment envForPrefixNull = new Environment() { + @Override + public JobID getJobID() { + return null; + } + + @Override + public JobVertexID getJobVertexId() { + return null; + } + + @Override + public ExecutionAttemptID getExecutionId() { + return null; + } + + @Override + public Configuration getTaskConfiguration() { + return null; + } + + @Override + public TaskManagerRuntimeInfo getTaskManagerInfo() { + return null; + } + + @Override + public Configuration getJobConfiguration() { + return null; + } + + @Override + public int getNumberOfSubtasks() { + return 0; + } + + @Override + public int getIndexInSubtaskGroup() { + return 0; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return null; + } + + @Override + public IOManager getIOManager() { + return null; + } + + @Override + public MemoryManager getMemoryManager() { + return null; + } + + @Override + public String getTaskName() { + return null; + } +
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130672880 > Otherwise, I found the SocketClientSink didn`t have the "retry". Is it necessary to get a "retry"? Yes, that might be an issue but let's keep it separate from our concern here. If you want, you can open a JIRA issue for the missing retry option in the `SocketClientSink`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130663511 Ok, I add two cases(retry 10 and 0) since I thought retry 1 time just same as 10. And would you please take a look with another two tests(https://github.com/apache/flink/pull/991 and https://github.com/apache/flink/pull/977)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36969808 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java --- @@ -1057,7 +1061,68 @@ public long count() throws Exception { public UnionOperator union(DataSet other){ return new UnionOperator(this, other, Utils.getCallLocationName()); } + + // + // Sample + // + + /** +* Generate a sample of DataSet by the probability fraction of each element. +* +* @param withReplacement Whether element can be selected more than once. +* @param fractionProbability that each element is chosen, should be [0,1] without replacement, +*and [0, â) with replacement. While fraction is larger than 1, the elements are +*expected to be selected multi times into sample on average. +* @return The sampled DataSet +*/ + public MapPartitionOperator sample(final boolean withReplacement, final double fraction) { + return sample(withReplacement, fraction, Utils.RNG.nextLong()); + } + + /** +* Generate a sample of DataSet by the probability fraction of each element. +* +* @param withReplacement Whether element can be selected more than once. +* @param fractionProbability that each element is chosen, should be [0,1] without replacement, +*and [0, â) with replacement. While fraction is larger than 1, the elements are +*expected to be selected multi times into sample on average. +* @param seedrandom number generator seed. +* @return The sampled DataSet +*/ + public MapPartitionOperator sample(final boolean withReplacement, final double fraction, final long seed) { + return mapPartition(new SampleWithFraction(withReplacement, fraction, seed)); + } + + /** +* Generate a sample of DataSet which contains fixed size elements. +* +* @param withReplacement Whether element can be selected more than once. +* @param numSample The expected sample size. +* @return The sampled DataSet +*/ --- End diff -- Maybe we want to include a note that this kind of sampling currently takes 2 passes over the data, and recommend using fraction unless exact precision is necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130664351 @mxm Otherwise, I found the SocketClientSink didn`t have the "retry". Is it necessary to get a "retry"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2515) CheckpointCoordinator triggers checkpoints even if not all sources are running any more
Stephan Ewen created FLINK-2515: --- Summary: CheckpointCoordinator triggers checkpoints even if not all sources are running any more Key: FLINK-2515 URL: https://issues.apache.org/jira/browse/FLINK-2515 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 0.10 Reporter: Stephan Ewen Priority: Blocker Fix For: 0.10 When some sources finish early, they will not emit checkpoint barriers any more. That means that pending checkpoint alignments will never be able to complete, locking the flow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36955412 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. --- End diff -- Do you mean Bernouli _trial_ here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2306) Add support for named streams in Storm compatibility layer
[ https://issues.apache.org/jira/browse/FLINK-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695292#comment-14695292 ] ASF GitHub Bot commented on FLINK-2306: --- Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1011#issuecomment-130694237 Yes. It a weird compile error and it builds locally... The error is: `[ERROR] /home/travis/build/mjsax/flink/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java:[27,64] package org.apache.flink.hadoop.shaded.com.google.common.collect does not exist` I have no clue what's wrong... :/ Any suggestions? > Add support for named streams in Storm compatibility layer > -- > > Key: FLINK-2306 > URL: https://issues.apache.org/jira/browse/FLINK-2306 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > Currently, the layer only works on single stream and ignores stream names, > ie, each stream is treated as "default" stream. The declaration of multiple > output streams is ignored (all tuples are emitted to the same stream). If > multiple input streams are consumed all tuples are merged into a single > stream. > This feature allows operators to declare multiple (named) output streams and > emit tuples to different stream. Furthermore, it enables Bolts to distinguish > incoming tuples from different streams by stream name (Storm tuple meta > information). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130644950 `StringBuilder` is only for single-threaded while `StringBuffer` enables multi-thread access. If you use `StringBuffer` in a single-threaded scenario it has worse performance than `StringBuilder`. Thanks for you changes. In addition to the "infinity" test, can you add a test that checks for a certain number of retries (e.g. 10)? Also please add a check for 1 and 0 retries. It's always good to test corner cases :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130626843 Hi Max, I fixed all as your reviews. And I retained the change of StringBuffer to StringBuilder. There is a question that as I see the StringBuilder just do the same thing as StringBuffer currently. So what`s the real different the two type in the SocketTextStreamFunction? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2306) Add support for named streams in Storm compatibility layer
[ https://issues.apache.org/jira/browse/FLINK-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695280#comment-14695280 ] ASF GitHub Bot commented on FLINK-2306: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1011#issuecomment-130692637 The CU reports 3 failures. 2 failures in storm-compatibility-core 1 failure in YARN (yarn not responding) > Add support for named streams in Storm compatibility layer > -- > > Key: FLINK-2306 > URL: https://issues.apache.org/jira/browse/FLINK-2306 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > Currently, the layer only works on single stream and ignores stream names, > ie, each stream is treated as "default" stream. The declaration of multiple > output streams is ignored (all tuples are emitted to the same stream). If > multiple input streams are consumed all tuples are merged into a single > stream. > This feature allows operators to declare multiple (named) output streams and > emit tuples to different stream. Furthermore, it enables Bolts to distinguish > incoming tuples from different streams by stream name (Storm tuple meta > information). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130524804 @mxm Hi, I fixed the StringBuffer and add the test. Take a look whether it`s correct. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/883#issuecomment-130623223 I've actually included this in #1000. Could you close this PR @samk3211? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1000#discussion_r36963490 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java --- @@ -86,26 +72,35 @@ public void testConnectedComponents() throws Exception { @Test public void testSingleSourceShortestPaths() throws Exception { - GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, resultPath, "16"}); - expectedResult = "1 0.0\n" + - "2 12.0\n" + - "3 13.0\n" + - "4 47.0\n" + - "5 48.0\n" + - "6 Infinity\n" + - "7 Infinity\n"; + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph inputGraph = Graph.fromDataSet( + SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), + new InitMapperSSSP(), env); + +List> result = inputGraph.run(new GSASingleSourceShortestPaths(1l, 16)) + .getVertices().collect(); --- End diff -- Yes, we do! The idea is to lazily mitigate the rest of the tests, too. Take a look at FLINK-2032. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...
Github user kno10 commented on the pull request: https://github.com/apache/flink/pull/696#issuecomment-130469648 R-trees are hard to parallelize. For distributed and gigabyte size data, an approximative approach is preferable, like the one we discuss in this article: E. Schubert, A. Zimek, H.-P. Kriegel Fast and Scalable Outlier Detection with Approximate Nearest Neighbor Ensembles In Proceedings of the 20th International Conference on Database Systems for Advanced Applications (DASFAA), Hanoi, Vietnam: 19â36, 2015. We discuss an approach that is easy to parallelize. It needs sorting and a sliding window (or blocks), so it is not strict MapReduce, but it should be a good match for Flink. The hardest part is to get the different space filling curves right and efficient. The other components (random projections to reduce dimensionality, ensemble to improve quality, and list inversions to also build reverse kNN that then allow accelerating methods such as LOF are much easier). The main drawback of most of these kNN-join approaches (including ours) is that they only work with Minkowski norms. There are much more interesting distance functions than that... We also discuss why the space filling curves appear to give better results for kNN, while LSH etc. work better for radius joins. LSH is another option, but it cannot guarantee to find k neighbors and parameter tuning is tricky. So you may want to have a look at this recent ensemble approach instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [CLEANUP] Add space between quotes and plus si...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1010#issuecomment-130633384 Thanks Henry, please add this rule. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2077) Rework Path class and add extend support for Windows paths
[ https://issues.apache.org/jira/browse/FLINK-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695236#comment-14695236 ] Fabian Hueske commented on FLINK-2077: -- Hi Lun, {{//host/dir1/dir2}} is an example for a path to a Windows share, i.e., a path to a directory {{dir1/dir2}} which is shared by a host {{host}}. You can check the [Windows Dev Center|https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247%28v=vs.85%29.aspx#paths] for Windows path specification. However, the focus of this JIRA would be rather to clean up the {{Path}} class which exists for a long time and has been changed at several places. Now it is a bit messed up and hard to maintain. Adding support for Windows share paths would be nice but is not mandatory. Thanks, Fabian > Rework Path class and add extend support for Windows paths > -- > > Key: FLINK-2077 > URL: https://issues.apache.org/jira/browse/FLINK-2077 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: GaoLun >Priority: Minor > Labels: starter > > The class {{org.apache.flink.core.fs.Path}} handles paths for Flink's > {{FileInputFormat}} and {{FileOutputFormat}}. Over time, this class has > become quite hard to read and modify. > It would benefit from some cleaning and refactoring. Along with the > refactoring, support for Windows paths like {{//host/dir1/dir2}} could be > added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats
[ https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed FLINK-1819. - Resolution: Implemented Fix Version/s: (was: 0.9) 0.10 > Allow access to RuntimeContext from Input and OutputFormats > --- > > Key: FLINK-1819 > URL: https://issues.apache.org/jira/browse/FLINK-1819 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 0.9, 0.8.1 >Reporter: Fabian Hueske >Assignee: Sachin Goel >Priority: Minor > Fix For: 0.10 > > > User function that extend a RichFunction can access a {{RuntimeContext}} > which gives the parallel id of the task and access to Accumulators and > BroadcastVariables. > Right now, Input and OutputFormats cannot access their {{RuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2508) Confusing sharing of StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi reassigned FLINK-2508: - Assignee: Márton Balassi > Confusing sharing of StreamExecutionEnvironment > --- > > Key: FLINK-2508 > URL: https://issues.apache.org/jira/browse/FLINK-2508 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Márton Balassi > Fix For: 0.10 > > > In the {{StreamExecutionEnvironment}}, the environment is once created and > then shared with a static variable to all successive calls to > {{getExecutionEnvironment()}}. But it can be overridden by calls to > {{createLocalEnvironment()}} and {{createRemoteEnvironment()}}. > This seems a bit un-intuitive, and probably creates confusion when > dispatching multiple streaming jobs from within the same JVM. > Why is it even necessary to cache the "current" execution environment? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats
[ https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695231#comment-14695231 ] ASF GitHub Bot commented on FLINK-1819: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/966 > Allow access to RuntimeContext from Input and OutputFormats > --- > > Key: FLINK-1819 > URL: https://issues.apache.org/jira/browse/FLINK-1819 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 0.9, 0.8.1 >Reporter: Fabian Hueske >Assignee: Sachin Goel >Priority: Minor > Fix For: 0.9 > > > User function that extend a RichFunction can access a {{RuntimeContext}} > which gives the parallel id of the task and access to Accumulators and > BroadcastVariables. > Right now, Input and OutputFormats cannot access their {{RuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.
[ https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi reassigned FLINK-2514: - Assignee: Márton Balassi > Local and Remote environment behave differently when re-triggering execution. > - > > Key: FLINK-2514 > URL: https://issues.apache.org/jira/browse/FLINK-2514 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Márton Balassi >Priority: Critical > Fix For: 0.10 > > > The following code behaves differently on the {{LocalStreamEnvironment}} and > the {{RemoteStreamEnvironment}}. > {code} > StreamExecutionEnvironment env = ...; > env.addSource(someSource).addSink(someSink); > env.execute(); > env.addSource(anotherSource).addSink(anotherSink); > env.execute(); > {code} > Locally, only the second source/sink pair is executed. > Remotely, both are re-executed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats
[ https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695232#comment-14695232 ] ASF GitHub Bot commented on FLINK-1819: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/966#issuecomment-130677380 Thanks for the contribution @sachingoel0101! > Allow access to RuntimeContext from Input and OutputFormats > --- > > Key: FLINK-1819 > URL: https://issues.apache.org/jira/browse/FLINK-1819 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 0.9, 0.8.1 >Reporter: Fabian Hueske >Assignee: Sachin Goel >Priority: Minor > Fix For: 0.9 > > > User function that extend a RichFunction can access a {{RuntimeContext}} > which gives the parallel id of the task and access to Accumulators and > BroadcastVariables. > Right now, Input and OutputFormats cannot access their {{RuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.
[ https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi updated FLINK-2514: -- Fix Version/s: 0.10 > Local and Remote environment behave differently when re-triggering execution. > - > > Key: FLINK-2514 > URL: https://issues.apache.org/jira/browse/FLINK-2514 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Priority: Critical > Fix For: 0.10 > > > The following code behaves differently on the {{LocalStreamEnvironment}} and > the {{RemoteStreamEnvironment}}. > {code} > StreamExecutionEnvironment env = ...; > env.addSource(someSource).addSink(someSink); > env.execute(); > env.addSource(anotherSource).addSink(anotherSink); > env.execute(); > {code} > Locally, only the second source/sink pair is executed. > Remotely, both are re-executed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695215#comment-14695215 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130672880 > Otherwise, I found the SocketClientSink didn`t have the "retry". Is it necessary to get a "retry"? Yes, that might be an issue but let's keep it separate from our concern here. If you want, you can open a JIRA issue for the missing retry option in the `SocketClientSink`. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130588652 Thanks for your changes. I think we should use `read()` instead of `readLine()` because we are using a custom delimiter and not necessarily "\n" (newline symbol). The danger of reading an entire line is that the newline symbol might never arrive. So it might continue to read forever. And even if it manages to find a newline symbol, you have to truncate your input to find the custom delimiter. That's not very efficient. Can you change the code back to using the `read()` method? I think we had a misunderstanding. For you test case: It's not considered good practice to mix production and test code. You're doing that by introducing the `isRetrying` flag and exposing it. Alternatively, you have two options: 1. Create a `ServerSocket` and pass its address to the `SocketTextStreamFunction`. Then control the connection to this socket and count how often the function reconnects (e.g. use the `accept()` method). 2. Create your test in the same package as the `SocketTextStreamFunction` function (package is `org.apache.flink.streaming.api.functions.source`). Then you can access all field variables which are protected. So make your `retries` variable a protected field variable of the `SocketTextStreamFunction` class. I hope that this helps you. If not, feel free to ask more questions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695213#comment-14695213 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973430 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- You should also initialize the variable with 0 in the `open()` method. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695210#comment-14695210 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973254 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReference error = new AtomicReference(); + private final String host = "127.0.0.1"; + + SourceFunction.SourceContext ctx = new SourceFunction.SourceContext() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count < 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail("Error in spawned thread: " + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); +
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36957769 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.operators + +import java.util.{List => JavaList, Random} + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, After, Test} + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SampleITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private val RNG: Random = new Random + + private var result: JavaList[String] = null; + + @Before + def initiate { +ExecutionEnvironment.getExecutionEnvironment.setParallelism(5) + } + + @After + def after() = { +TestBaseUtils.containsResultAsText(result, getSourceStrings) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithoutReplacement { +verifySamplerWithFractionWithoutReplacement(0d) +verifySamplerWithFractionWithoutReplacement(0.2d) +verifySamplerWithFractionWithoutReplacement(1.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithReplacement { +verifySamplerWithFractionWithReplacement(0d) +verifySamplerWithFractionWithReplacement(0.2d) +verifySamplerWithFractionWithReplacement(1.0d) +verifySamplerWithFractionWithReplacement(2.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithoutReplacement { +verifySamplerWithFixedSizeWithoutReplacement(0) +verifySamplerWithFixedSizeWithoutReplacement(2) +verifySamplerWithFixedSizeWithoutReplacement(21) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithReplacement { +verifySamplerWithFixedSizeWithReplacement(0) +verifySamplerWithFixedSizeWithReplacement(2) +verifySamplerWithFixedSizeWithReplacement(21) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double) { +verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(false, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double) { +verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(true, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFraction(withReplacement: Boolean, fraction: Double, seed: Long) { +val ds = getSourceDataSet() +val sampled = ds.sample(withReplacement, fraction, seed) +result = sampled.collect.asJava --- End diff -- Is this result checked for validity somewhere? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695212#comment-14695212 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973377 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- Sorry, minor thing but it should be `retries`. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695211#comment-14695211 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973279 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReference error = new AtomicReference(); + private final String host = "127.0.0.1"; + + SourceFunction.SourceContext ctx = new SourceFunction.SourceContext() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count < 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail("Error in spawned thread: " + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); +
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695196#comment-14695196 ] ASF GitHub Bot commented on FLINK-2490: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130664351 @mxm Otherwise, I found the SocketClientSink didn`t have the "retry". Is it necessary to get a "retry"? > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955334 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -145,4 +152,8 @@ public void cancel() { } } } + + public boolean getIsRetrying() { --- End diff -- Please remove this getter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695192#comment-14695192 ] ASF GitHub Bot commented on FLINK-2490: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130663511 Ok, I add two cases(retry 10 and 0) since I thought retry 1 time just same as 10. And would you please take a look with another two tests(https://github.com/apache/flink/pull/991 and https://github.com/apache/flink/pull/977)? > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36958345 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. + * + * @param The type of sample. + */ +public class BernoulliSampler extends RandomSampler { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +*/ + public BernoulliSampler(double fraction) { + this(fraction, new Random()); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator seed. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +* @param seed Random number generator seed. +*/ + public BernoulliSampler(double fraction, long seed) { + this(fraction, new Random(seed)); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +* @param random The random number generator. +*/ + public BernoulliSampler(double fraction, Random random) { + Preconditions.checkArgument(fraction >= 0 && fraction <= 1.0d, "fraction fraction must between [0, 1]."); + this.fraction = fraction; + this.random = random; + } + + /** +* Sample the input elements, for each input element, take a Bernoulli Trail for sample. +* +* @param input Elements to be sampled. +* @return The sampled result which is lazy computed upon input elements. +*/ + @Override + public Iterator sample(final Iterator input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIterator() { + T current; + + @Override + public boolean hasNext() { + if (current == null) { + while (input.hasNext()) { + T element = input.next(); + if (random.nextDouble() <= fraction) { + current = element; + return true; + } + } + current = null; + return false; + } else { + return true; + } + } + + @Override + public T next() { --- End diff -- It feels a bit counterintuitive that the next element is prepared in the `hasNext()` function. Doesn't this mean that `hasNext()` **needs** to be called every time before we call `next()`? Can we protect against that case where we would get a `null` element back that way? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this fea
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36950634 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala --- @@ -1182,6 +1184,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { getCallLocationName())) // + // Sample + // + /** + * Generate a sample of DataSet by the probability fraction of each element. + * + * @param withReplacement Whether element can be selected more than once. + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. + * @param seedRandom number generator seed. + * @return The sampled DataSet + */ + def sample( + withReplacement: Boolean, + fraction: Double, + seed: Long = Utils.RNG.nextLong()): DataSet[T] = { + +wrap(new MapPartitionOperator[T, T](javaSet, + getType(), + new SampleWithFraction(withReplacement, fraction, seed), + getCallLocationName())) + } + + /** + * Generate a sample of DataSet by the probability fraction of each element. --- End diff -- Javadoc is from the fraction function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955221 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -69,14 +68,14 @@ public void run(SourceContext ctx) throws Exception { public void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { --- End diff -- I think this method should be private because it is not meant to be used outside this class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695166#comment-14695166 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36969808 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java --- @@ -1057,7 +1061,68 @@ public long count() throws Exception { public UnionOperator union(DataSet other){ return new UnionOperator(this, other, Utils.getCallLocationName()); } + + // + // Sample + // + + /** +* Generate a sample of DataSet by the probability fraction of each element. +* +* @param withReplacement Whether element can be selected more than once. +* @param fractionProbability that each element is chosen, should be [0,1] without replacement, +*and [0, ∞) with replacement. While fraction is larger than 1, the elements are +*expected to be selected multi times into sample on average. +* @return The sampled DataSet +*/ + public MapPartitionOperator sample(final boolean withReplacement, final double fraction) { + return sample(withReplacement, fraction, Utils.RNG.nextLong()); + } + + /** +* Generate a sample of DataSet by the probability fraction of each element. +* +* @param withReplacement Whether element can be selected more than once. +* @param fractionProbability that each element is chosen, should be [0,1] without replacement, +*and [0, ∞) with replacement. While fraction is larger than 1, the elements are +*expected to be selected multi times into sample on average. +* @param seedrandom number generator seed. +* @return The sampled DataSet +*/ + public MapPartitionOperator sample(final boolean withReplacement, final double fraction, final long seed) { + return mapPartition(new SampleWithFraction(withReplacement, fraction, seed)); + } + + /** +* Generate a sample of DataSet which contains fixed size elements. +* +* @param withReplacement Whether element can be selected more than once. +* @param numSample The expected sample size. +* @return The sampled DataSet +*/ --- End diff -- Maybe we want to include a note that this kind of sampling currently takes 2 passes over the data, and recommend using fraction unless exact precision is necessary. > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative or exact size of the sample, set a seed for > reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.
Stephan Ewen created FLINK-2514: --- Summary: Local and Remote environment behave differently when re-triggering execution. Key: FLINK-2514 URL: https://issues.apache.org/jira/browse/FLINK-2514 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Stephan Ewen The following code behaves differently on the {{LocalStreamEnvironment}} and the {{RemoteStreamEnvironment}}. {code} StreamExecutionEnvironment env = ...; env.addSource(someSource).addSink(someSink); env.execute(); env.addSource(anotherSource).addSink(anotherSink); env.execute(); {code} Locally, only the second source/sink pair is executed. Remotely, both are re-executed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.
[ https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-2514: Priority: Critical (was: Major) > Local and Remote environment behave differently when re-triggering execution. > - > > Key: FLINK-2514 > URL: https://issues.apache.org/jira/browse/FLINK-2514 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Priority: Critical > > The following code behaves differently on the {{LocalStreamEnvironment}} and > the {{RemoteStreamEnvironment}}. > {code} > StreamExecutionEnvironment env = ...; > env.addSource(someSource).addSink(someSink); > env.execute(); > env.addSource(anotherSource).addSink(anotherSink); > env.execute(); > {code} > Locally, only the second source/sink pair is executed. > Remotely, both are re-executed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2512]Add client.close() before throw Ru...
Github user ffbin commented on the pull request: https://github.com/apache/flink/pull/1009#issuecomment-130509029 @uce @hsaputra Thanks. I have move the try up and rely on finally to close the client. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695133#comment-14695133 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130644950 `StringBuilder` is only for single-threaded while `StringBuffer` enables multi-thread access. If you use `StringBuffer` in a single-threaded scenario it has worse performance than `StringBuilder`. Thanks for you changes. In addition to the "infinity" test, can you add a test that checks for a certain number of retries (e.g. 10)? Also please add a check for 1 and 0 retries. It's always good to test corner cases :) > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...
Github user samk3211 commented on the pull request: https://github.com/apache/flink/pull/883#issuecomment-130632285 closed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...
Github user samk3211 closed the pull request at: https://github.com/apache/flink/pull/883 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2513) Extend state handle provider interface to list all state handles
Ufuk Celebi created FLINK-2513: -- Summary: Extend state handle provider interface to list all state handles Key: FLINK-2513 URL: https://issues.apache.org/jira/browse/FLINK-2513 Project: Flink Issue Type: Improvement Components: Distributed Runtime Reporter: Ufuk Celebi This is a follow up issue to FLINK-2354. In FLINK-2354 we use ZooKeeper to persist state handles. In certain failure scenarios, there can be lingering state handles, which have not been written to ZooKeeper, but which were already created. These can be cleaned up on startup, if the state handle provider implementation kept track of their state handles. With the current implementations it would be possible, e.g. list the directory (file system) of no state is persistent (job manager). It would be fair enough have this as an optional operation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36960527 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -451,6 +454,53 @@ protected static File asFile(String path) { assertEquals(extectedStrings[i], resultStrings[i]); } } + + // + // Comparison methods for tests using sample + // + + public static void containsResultAsTuples(List result, String expected) { + isExpectedContainsResult(result, expected, true); + } + + public static void containsResultAsText(List result, String expected) { + isExpectedContainsResult(result, expected, false); + } + + private static void isExpectedContainsResult(List result, String expected, boolean asTuple) { --- End diff -- Can we get comments explaining the functionality of this and `containsResultAsText`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955189 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -40,10 +37,12 @@ private char delimiter; private long maxRetry; private boolean retryForever; + private boolean isRetrying = false; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + private volatile boolean isExit = false; --- End diff -- Is this flag necessary? We have `isRunning` already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2451) Cleanup Gelly examples
[ https://issues.apache.org/jira/browse/FLINK-2451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695067#comment-14695067 ] ASF GitHub Bot commented on FLINK-2451: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1000#discussion_r36963490 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java --- @@ -86,26 +72,35 @@ public void testConnectedComponents() throws Exception { @Test public void testSingleSourceShortestPaths() throws Exception { - GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, resultPath, "16"}); - expectedResult = "1 0.0\n" + - "2 12.0\n" + - "3 13.0\n" + - "4 47.0\n" + - "5 48.0\n" + - "6 Infinity\n" + - "7 Infinity\n"; + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph inputGraph = Graph.fromDataSet( + SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), + new InitMapperSSSP(), env); + +List> result = inputGraph.run(new GSASingleSourceShortestPaths(1l, 16)) + .getVertices().collect(); --- End diff -- Yes, we do! The idea is to lazily mitigate the rest of the tests, too. Take a look at FLINK-2032. > Cleanup Gelly examples > -- > > Key: FLINK-2451 > URL: https://issues.apache.org/jira/browse/FLINK-2451 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 0.10 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri >Priority: Minor > > As per discussion in the dev@ mailing list, this issue proposes the following > changes to the Gelly examples and library: > 1. Keep the following examples as they are: > EuclideanGraphWeighing, GraphMetrics, IncrementalSSSP, JaccardSimilarity, > MusicProfiles. > 2. Keep only 1 example to show how to use library methods. > 3. Add 1 example for vertex-centric iterations. > 4. Keep 1 example for GSA iterations and move the redundant GSA > implementations to the library. > 5. Improve the examples documentation and refer to the functionality that > each of them demonstrates. > 6. Port and modify existing example tests accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695069#comment-14695069 ] ASF GitHub Bot commented on FLINK-2490: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130626843 Hi Max, I fixed all as your reviews. And I retained the change of StringBuffer to StringBuilder. There is a question that as I see the StringBuilder just do the same thing as StringBuffer currently. So what`s the real different the two type in the SocketTextStreamFunction? > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36960080 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -451,6 +454,53 @@ protected static File asFile(String path) { assertEquals(extectedStrings[i], resultStrings[i]); } } + + // + // Comparison methods for tests using sample + // + + public static void containsResultAsTuples(List result, String expected) { --- End diff -- Is this used anywhere? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695034#comment-14695034 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36960527 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -451,6 +454,53 @@ protected static File asFile(String path) { assertEquals(extectedStrings[i], resultStrings[i]); } } + + // + // Comparison methods for tests using sample + // + + public static void containsResultAsTuples(List result, String expected) { + isExpectedContainsResult(result, expected, true); + } + + public static void containsResultAsText(List result, String expected) { + isExpectedContainsResult(result, expected, false); + } + + private static void isExpectedContainsResult(List result, String expected, boolean asTuple) { --- End diff -- Can we get comments explaining the functionality of this and `containsResultAsText`? > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative or exact size of the sample, set a seed for > reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695031#comment-14695031 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36960080 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -451,6 +454,53 @@ protected static File asFile(String path) { assertEquals(extectedStrings[i], resultStrings[i]); } } + + // + // Comparison methods for tests using sample + // + + public static void containsResultAsTuples(List result, String expected) { --- End diff -- Is this used anywhere? > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative or exact size of the sample, set a seed for > reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695003#comment-14695003 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36958345 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. + * + * @param The type of sample. + */ +public class BernoulliSampler extends RandomSampler { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +*/ + public BernoulliSampler(double fraction) { + this(fraction, new Random()); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator seed. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +* @param seed Random number generator seed. +*/ + public BernoulliSampler(double fraction, long seed) { + this(fraction, new Random(seed)); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +* @param random The random number generator. +*/ + public BernoulliSampler(double fraction, Random random) { + Preconditions.checkArgument(fraction >= 0 && fraction <= 1.0d, "fraction fraction must between [0, 1]."); + this.fraction = fraction; + this.random = random; + } + + /** +* Sample the input elements, for each input element, take a Bernoulli Trail for sample. +* +* @param input Elements to be sampled. +* @return The sampled result which is lazy computed upon input elements. +*/ + @Override + public Iterator sample(final Iterator input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIterator() { + T current; + + @Override + public boolean hasNext() { + if (current == null) { + while (input.hasNext()) { + T element = input.next(); + if (random.nextDouble() <= fraction) { + current = element; + return true; + } + } + current = null; + return false; + } else { + return true; + } + } + + @Override + public T next() { --- End diff -- It feels a bit counterintuitive that the next element is prepared in the `hasNext()` function. Doesn't this mean that `hasNext()` **needs** to be called every time before we call
[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/696#issuecomment-130574663 @kno10 thanks for the pointer. Judging from the abstract it sounds really promising and it seems definitely worth exploring. Unfortunately, I don't have access to the complete paper which makes it hard to fully understand the algorithm. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2373) Add configuration parameter to createRemoteEnvironment method
[ https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694989#comment-14694989 ] Ufuk Celebi commented on FLINK-2373: Andreas, did you have a look at this? I have a small change as part of an upcoming pull request to allow this as well. I will keep it simple and just add: {code} createRemoteEnvironment(Configuration flinkConfig, String... jarFiles) {code} It would be nice to think about the "whole" execution environment stack and have it more consistent. > Add configuration parameter to createRemoteEnvironment method > - > > Key: FLINK-2373 > URL: https://issues.apache.org/jira/browse/FLINK-2373 > Project: Flink > Issue Type: Bug > Components: other >Reporter: Andreas Kunft >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Currently there is no way to provide a custom configuration upon creation of > a remote environment (via ExecutionEnvironment.createRemoteEnvironment(...)). > This leads to errors when the submitted job exceeds the default value for the > max. payload size in Akka, as we can not increase the configuration value > (akka.remote.OversizedPayloadException: Discarding oversized payload...) > Providing an overloaded method with a configuration parameter for the remote > environment fixes that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694981#comment-14694981 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36957769 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.operators + +import java.util.{List => JavaList, Random} + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, After, Test} + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SampleITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private val RNG: Random = new Random + + private var result: JavaList[String] = null; + + @Before + def initiate { +ExecutionEnvironment.getExecutionEnvironment.setParallelism(5) + } + + @After + def after() = { +TestBaseUtils.containsResultAsText(result, getSourceStrings) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithoutReplacement { +verifySamplerWithFractionWithoutReplacement(0d) +verifySamplerWithFractionWithoutReplacement(0.2d) +verifySamplerWithFractionWithoutReplacement(1.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithReplacement { +verifySamplerWithFractionWithReplacement(0d) +verifySamplerWithFractionWithReplacement(0.2d) +verifySamplerWithFractionWithReplacement(1.0d) +verifySamplerWithFractionWithReplacement(2.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithoutReplacement { +verifySamplerWithFixedSizeWithoutReplacement(0) +verifySamplerWithFixedSizeWithoutReplacement(2) +verifySamplerWithFixedSizeWithoutReplacement(21) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithReplacement { +verifySamplerWithFixedSizeWithReplacement(0) +verifySamplerWithFixedSizeWithReplacement(2) +verifySamplerWithFixedSizeWithReplacement(21) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double) { +verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(false, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double) { +verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(true, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFraction(withReplacement: Boolean, fraction: Double, seed: Long) { +val ds = getSourceDataSet() +val sampled = ds.sample(withReplacement, fraction, seed) +result = sampled.collect.asJava --- End diff -- Is this result checked for validity somewhere? > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.a
[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats
[ https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694974#comment-14694974 ] ASF GitHub Bot commented on FLINK-1819: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/966#issuecomment-130595336 Thank you for your remarks @StephanEwen. >I think you should go ahead and just call them "Rich". It is just a name, and what matters is that the JavaDocs describe what it actually means... The JavaDocs in this pull request properly describe what a "RichInputFormat" is. So we are good to go. Let's stick with "Rich" although I understand your concerns. I'll merge this later if nobody objects in the meantime :) > Allow access to RuntimeContext from Input and OutputFormats > --- > > Key: FLINK-1819 > URL: https://issues.apache.org/jira/browse/FLINK-1819 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 0.9, 0.8.1 >Reporter: Fabian Hueske >Assignee: Sachin Goel >Priority: Minor > Fix For: 0.9 > > > User function that extend a RichFunction can access a {{RuntimeContext}} > which gives the parallel id of the task and access to Accumulators and > BroadcastVariables. > Right now, Input and OutputFormats cannot access their {{RuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36955515 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. + * + * @param The type of sample. + */ +public class BernoulliSampler extends RandomSampler { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. --- End diff -- *B*ernouli should be capitalized for all mentions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955316 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -85,11 +84,12 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex } } - if (data == -1) { + if (data == null) { socket.close(); long retry = 0; boolean success = false; - while (retry < maxRetry && !success) { + while ((retry < maxRetry || (retryForever && !isExit)) && !success) { + isRetrying = true; --- End diff -- This flag is only necessary for your test and thus should be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694943#comment-14694943 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36955515 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. + * + * @param The type of sample. + */ +public class BernoulliSampler extends RandomSampler { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. --- End diff -- *B*ernouli should be capitalized for all mentions > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative or exact size of the sample, set a seed for > reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694940#comment-14694940 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36955412 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. --- End diff -- Do you mean Bernouli _trial_ here? > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative or exact size of the sample, set a seed for > reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694938#comment-14694938 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955316 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -85,11 +84,12 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex } } - if (data == -1) { + if (data == null) { socket.close(); long retry = 0; boolean success = false; - while (retry < maxRetry && !success) { + while ((retry < maxRetry || (retryForever && !isExit)) && !success) { + isRetrying = true; --- End diff -- This flag is only necessary for your test and thus should be removed. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694939#comment-14694939 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955334 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -145,4 +152,8 @@ public void cancel() { } } } + + public boolean getIsRetrying() { --- End diff -- Please remove this getter. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955259 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -69,14 +68,14 @@ public void run(SourceContext ctx) throws Exception { public void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { try { - StringBuffer buffer = new StringBuffer(); + StringBuilder buffer = new StringBuilder(); BufferedReader reader = new BufferedReader(new InputStreamReader( socket.getInputStream())); while (isRunning) { - int data; + String data; try { - data = reader.read(); + data = reader.readLine(); --- End diff -- Please use `read()` because of the custom delimiter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694935#comment-14694935 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955221 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -69,14 +68,14 @@ public void run(SourceContext ctx) throws Exception { public void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { --- End diff -- I think this method should be private because it is not meant to be used outside this class. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694936#comment-14694936 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955259 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -69,14 +68,14 @@ public void run(SourceContext ctx) throws Exception { public void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { try { - StringBuffer buffer = new StringBuffer(); + StringBuilder buffer = new StringBuilder(); BufferedReader reader = new BufferedReader(new InputStreamReader( socket.getInputStream())); while (isRunning) { - int data; + String data; try { - data = reader.read(); + data = reader.readLine(); --- End diff -- Please use `read()` because of the custom delimiter. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694932#comment-14694932 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130588652 Thanks for your changes. I think we should use `read()` instead of `readLine()` because we are using a custom delimiter and not necessarily "\n" (newline symbol). The danger of reading an entire line is that the newline symbol might never arrive. So it might continue to read forever. And even if it manages to find a newline symbol, you have to truncate your input to find the custom delimiter. That's not very efficient. Can you change the code back to using the `read()` method? I think we had a misunderstanding. For you test case: It's not considered good practice to mix production and test code. You're doing that by introducing the `isRetrying` flag and exposing it. Alternatively, you have two options: 1. Create a `ServerSocket` and pass its address to the `SocketTextStreamFunction`. Then control the connection to this socket and count how often the function reconnects (e.g. use the `accept()` method). 2. Create your test in the same package as the `SocketTextStreamFunction` function (package is `org.apache.flink.streaming.api.functions.source`). Then you can access all field variables which are protected. So make your `retries` variable a protected field variable of the `SocketTextStreamFunction` class. I hope that this helps you. If not, feel free to ask more questions. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694934#comment-14694934 ] ASF GitHub Bot commented on FLINK-2490: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955189 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -40,10 +37,12 @@ private char delimiter; private long maxRetry; private boolean retryForever; + private boolean isRetrying = false; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + private volatile boolean isExit = false; --- End diff -- Is this flag necessary? We have `isRunning` already. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2306] Add support for named streams in ...
GitHub user mjsax opened a pull request: https://github.com/apache/flink/pull/1011 [FLINK-2306] Add support for named streams in Storm compatibility layer - enabled .declareStream() and connect via stream name - enabled multiplt output streams - added .split() / .select() / strip pattern - added helpers in new package utils - adapted and extended JUnit tests - adapted examples You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/flink flink-2306-storm-namedStreams Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1011.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1011 commit 0b7f47fed18fbe9c9b353960b337e2d8454e11b7 Author: mjsax Date: 2015-08-13T06:56:47Z [FLINK-2306] Add support for named streams in Storm compatibility layer - enabled .declareStream() and connect via stream name - enabled multiplt output streams - added .split() / .select() / strip pattern - added helpers in new package utils - adapted and extended JUnit tests - adapted examples some minor improvements (FlinkClient, integration of Tuple0) commit c48ad0f516ee7583f8a6c4e564213fcd614c74e2 Author: mjsax Date: 2015-08-12T18:56:58Z Added split examples - example for embedded Spout/Bolt - two test-examples for complete topologies Additionally: - updated README.md - extended web documentation - add comments to pom.xml to explain examples better commit 71eeb4f2b77b643e0bbc0af4ed9fe212a4a306b7 Author: mjsax Date: 2015-08-12T18:57:21Z TO BE DELETED - this PR depends on FLINK-2457 (Integrate Tuple0), ie, PR #983 - this changes are hotfixes to make the branch work and are not needed after PR#983 is merged and this branch is rebased --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694894#comment-14694894 ] ASF GitHub Bot commented on FLINK-1745: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/696#issuecomment-130574663 @kno10 thanks for the pointer. Judging from the abstract it sounds really promising and it seems definitely worth exploring. Unfortunately, I don't have access to the complete paper which makes it hard to fully understand the algorithm. > Add exact k-nearest-neighbours algorithm to machine learning library > > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694886#comment-14694886 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36950634 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala --- @@ -1182,6 +1184,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { getCallLocationName())) // + // Sample + // + /** + * Generate a sample of DataSet by the probability fraction of each element. + * + * @param withReplacement Whether element can be selected more than once. + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. + * @param seedRandom number generator seed. + * @return The sampled DataSet + */ + def sample( + withReplacement: Boolean, + fraction: Double, + seed: Long = Utils.RNG.nextLong()): DataSet[T] = { + +wrap(new MapPartitionOperator[T, T](javaSet, + getType(), + new SampleWithFraction(withReplacement, fraction, seed), + getCallLocationName())) + } + + /** + * Generate a sample of DataSet by the probability fraction of each element. --- End diff -- Javadoc is from the fraction function. > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative or exact size of the sample, set a seed for > reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-1745: - Assignee: (was: Till Rohrmann) > Add exact k-nearest-neighbours algorithm to machine learning library > > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2493) Simplify names of example program JARs
[ https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694854#comment-14694854 ] Stephan Ewen commented on FLINK-2493: - Still, seeking consensus on the dev list is a good idea, let's do that! > Simplify names of example program JARs > -- > > Key: FLINK-2493 > URL: https://issues.apache.org/jira/browse/FLINK-2493 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: chenliang >Priority: Minor > Labels: easyfix, starter > > I find the names of the example JARs a bit annoying. > Why not name the file {{examples/ConnectedComponents.jar}} rather than > {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}} > And combine "flink-java-examples" and "flink-scala-examples" project to one > examples project。 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2493) Simplify names of example program JARs
[ https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694852#comment-14694852 ] Márton Balassi commented on FLINK-2493: --- Ok, fair enough then. > Simplify names of example program JARs > -- > > Key: FLINK-2493 > URL: https://issues.apache.org/jira/browse/FLINK-2493 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: chenliang >Priority: Minor > Labels: easyfix, starter > > I find the names of the example JARs a bit annoying. > Why not name the file {{examples/ConnectedComponents.jar}} rather than > {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}} > And combine "flink-java-examples" and "flink-scala-examples" project to one > examples project。 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2493) Simplify names of example program JARs
[ https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694851#comment-14694851 ] Stephan Ewen commented on FLINK-2493: - I think there was a discussion about combining the examples into one project a bit back, in the context of also re-organizing the Gelly examples. I actually like this idea. Will help us reduce the Jungle of projects a bit. The problems occur mostly when referencing Scala from Java code, and the two sets of code are completely separate here. There should be no harm in combining the example projects. > Simplify names of example program JARs > -- > > Key: FLINK-2493 > URL: https://issues.apache.org/jira/browse/FLINK-2493 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: chenliang >Priority: Minor > Labels: easyfix, starter > > I find the names of the example JARs a bit annoying. > Why not name the file {{examples/ConnectedComponents.jar}} rather than > {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}} > And combine "flink-java-examples" and "flink-scala-examples" project to one > examples project。 -- This message was sent by Atlassian JIRA (v6.3.4#6332)