[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...

2015-08-13 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/966#issuecomment-130979827
  
Sure. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696519#comment-14696519
 ] 

ASF GitHub Bot commented on FLINK-1819:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/966#issuecomment-130979827
  
Sure. :)


> Allow access to RuntimeContext from Input and OutputFormats
> ---
>
> Key: FLINK-1819
> URL: https://issues.apache.org/jira/browse/FLINK-1819
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 0.9, 0.8.1
>Reporter: Fabian Hueske
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 0.10
>
>
> User function that extend a RichFunction can access a {{RuntimeContext}} 
> which gives the parallel id of the task and access to Accumulators and 
> BroadcastVariables. 
> Right now, Input and OutputFormats cannot access their {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696375#comment-14696375
 ] 

ASF GitHub Bot commented on FLINK-2480:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r37046489
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
+   public MemoryManager getMemoryManager() {
+   return null;
+   }
+
+   @Override
+   public String getTaskName() {
+   return null;
+   }
   

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r37046489
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
+   public MemoryManager getMemoryManager() {
+   return null;
+   }
+
+   @Override
+   public String getTaskName() {
+   return null;
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return null;
+   }
+
+   @Override
+   public ClassLoader getUserClassLoader() {
+   

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696369#comment-14696369
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r37046195
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List => JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+verifySamplerWithFractionWithoutReplacement(0d)
+verifySamplerWithFractionWithoutReplacement(0.2d)
+verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+verifySamplerWithFractionWithReplacement(0d)
+verifySamplerWithFractionWithReplacement(0.2d)
+verifySamplerWithFractionWithReplacement(1.0d)
+verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+verifySamplerWithFixedSizeWithoutReplacement(0)
+verifySamplerWithFixedSizeWithoutReplacement(2)
+verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+verifySamplerWithFixedSizeWithReplacement(0)
+verifySamplerWithFixedSizeWithReplacement(2)
+verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double) {
+verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double, seed: Long) {
+verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, 
seed: Long) {
+verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, 
fraction: Double, seed: Long) {
+val ds = getSourceDataSet()
+val sampled = ds.sample(withReplacement, fraction, seed)
+result = sampled.collect.asJava
--- End diff --

The validity of sample result is verified in after() method for each test. 
As the source data is very small, verify the fraction does not make much sense, 
so i didn't verify the fraction valid

[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread ChengXiangLi
Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r37046195
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List => JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+verifySamplerWithFractionWithoutReplacement(0d)
+verifySamplerWithFractionWithoutReplacement(0.2d)
+verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+verifySamplerWithFractionWithReplacement(0d)
+verifySamplerWithFractionWithReplacement(0.2d)
+verifySamplerWithFractionWithReplacement(1.0d)
+verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+verifySamplerWithFixedSizeWithoutReplacement(0)
+verifySamplerWithFixedSizeWithoutReplacement(2)
+verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+verifySamplerWithFixedSizeWithReplacement(0)
+verifySamplerWithFixedSizeWithReplacement(2)
+verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double) {
+verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double, seed: Long) {
+verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, 
seed: Long) {
+verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, 
fraction: Double, seed: Long) {
+val ds = getSourceDataSet()
+val sampled = ds.sample(withReplacement, fraction, seed)
+result = sampled.collect.asJava
--- End diff --

The validity of sample result is verified in after() method for each test. 
As the source data is very small, verify the fraction does not make much sense, 
so i didn't verify the fraction validity here, but it got verified in 
RandomSamplerTest in Sampler level by the way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes

[jira] [Commented] (FLINK-2516) Remove unwanted log.isInfoEnabled check

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696321#comment-14696321
 ] 

ASF GitHub Bot commented on FLINK-2516:
---

GitHub user ffbin opened a pull request:

https://github.com/apache/flink/pull/1012

[FLINK-2516]Remove unwanted log.isInfoEnabled check

The function has call log.info() at the head of  it.So i think the check of 
log.isInfoEnabled after call log.info() is unwanted.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ffbin/flink FLINK-2516

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1012


commit 2354d008021b93885b3183fd8becd67c52e18424
Author: ffbin <869218...@qq.com>
Date:   2015-08-14T01:37:14Z

[FLINK-2516]Remove unwanted log.isInfoEnabled check




> Remove unwanted log.isInfoEnabled check
> ---
>
> Key: FLINK-2516
> URL: https://issues.apache.org/jira/browse/FLINK-2516
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 0.8.1
>Reporter: fangfengbin
>Assignee: fangfengbin
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2516]Remove unwanted log.isInfoEnabled ...

2015-08-13 Thread ffbin
GitHub user ffbin opened a pull request:

https://github.com/apache/flink/pull/1012

[FLINK-2516]Remove unwanted log.isInfoEnabled check

The function has call log.info() at the head of  it.So i think the check of 
log.isInfoEnabled after call log.info() is unwanted.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ffbin/flink FLINK-2516

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1012


commit 2354d008021b93885b3183fd8becd67c52e18424
Author: ffbin <869218...@qq.com>
Date:   2015-08-14T01:37:14Z

[FLINK-2516]Remove unwanted log.isInfoEnabled check




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2516) Remove unwanted log.isInfoEnabled check

2015-08-13 Thread fangfengbin (JIRA)
fangfengbin created FLINK-2516:
--

 Summary: Remove unwanted log.isInfoEnabled check
 Key: FLINK-2516
 URL: https://issues.apache.org/jira/browse/FLINK-2516
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696283#comment-14696283
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37042816
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --


Sorry for my English. :)


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37042816
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --


Sorry for my English. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2015-08-13 Thread Sheetal Parade (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695941#comment-14695941
 ] 

Sheetal Parade commented on FLINK-2314:
---

I added split information in the checkpoint state. 

{code}
if(splitNumber == checkpointedSplit){
if(currRecord < checkpointedRecord) {
currRecord++;
continue;
}
}
{code}

restore state and snapshot checkpoint change accordingly.
{code}
@Override
public String snapshotState(long checkpointId, long checkpointTimestamp) throws 
Exception {
return currRecord+":"+ splitNumber;
}

@Override
public void restoreState(String state){
String[] res = state.split(":");
checkpointedRecord = Long.valueOf(res[0]);
checkpointedSplit = Integer.valueOf(res[1]);
}
{code}



> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Sheetal Parade
>  Labels: easyfix, starter
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-13 Thread nltran
Github user nltran commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-130734997
  
The code now passes all the checks :smiley:  I have addressed the previous 
comments you made previously. @StephanEwen , @fhueske could you give it a 
another review? Namely:
* The SSP slack configuration is now in TaskConfig instead of job-wide. At 
some point we might want to unify both iterations strategies since the BSP 
iteration mode is an edge case for SSP with the slack equal to zero.

* The parameter server is now completely orthogonal to Flink core. It is up 
to the user to set it up and call it. I'm preparing a sample job that uses both 
SSP and calls to a parameter server.

* Correct license header. Will I have to fill in a Contributor License 
Agreement at some point?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r36990759
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
+   public MemoryManager getMemoryManager() {
+   return null;
+   }
+
+   @Override
+   public String getTaskName() {
+   return null;
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return null;
+   }
+
+   @Override
+   public ClassLoader getUserClassLoader() {
+   return 

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r36990804
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
+   public MemoryManager getMemoryManager() {
+   return null;
+   }
+
+   @Override
+   public String getTaskName() {
+   return null;
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return null;
+   }
+
+   @Override
+   public ClassLoader getUserClassLoader() {
+   return 

[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2015-08-13 Thread Sheetal Parade (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695604#comment-14695604
 ] 

Sheetal Parade commented on FLINK-2314:
---

Can you provide some guidance?

As I understand, the checkpoint information needs input split information too?

> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Sheetal Parade
>  Labels: easyfix, starter
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Framesize fix

2015-08-13 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/934#issuecomment-130700589
  
Hi @mxm ,

I totally agree that in terms of code clarity and structure, it would be 
better to let all accumulators pass through the BlobCache. In fact, we also had 
a brief discussion with @StephanEwen on the matter.

The only problem is the latency penalty that MAY result from this design 
choice. In other words, if for most of the usages akka is enough, then forcing 
small blobs to pass through the blobCache (i.e. reading and writing 
accumulators to disk) may be expensive.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...

2015-08-13 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/966#issuecomment-130677380
  
Thanks for the contribution @sachingoel0101!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...

2015-08-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/966


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973377
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --

Sorry, minor thing but it should be `retries`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973430
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --

You should also initialize the variable with 0 in the `open()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973279
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+   source.start();
+
+   Socket channel;
+   channel = serverSo.accept();
+   channel.close();
+   serverSo.close();
+   

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973254
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+   source.start();
+
+   Socket channel;
+   channel = serverSo.accept();
+   channel.close();
+   serverSo.close();
+   

[GitHub] flink pull request: [FLINK-2306] Add support for named streams in ...

2015-08-13 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1011#issuecomment-130694237
  
Yes. It a weird compile error and it builds locally... The error is:
`[ERROR] 
/home/travis/build/mjsax/flink/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java:[27,64]
 package org.apache.flink.hadoop.shaded.com.google.common.collect does not 
exist`

I have no clue what's wrong... :/ Any suggestions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2306] Add support for named streams in ...

2015-08-13 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1011#issuecomment-130692637
  
The CU reports 3 failures.

2 failures in storm-compatibility-core
1 failure in YARN (yarn not responding)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695412#comment-14695412
 ] 

ASF GitHub Bot commented on FLINK-2480:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r36990804
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
+   public MemoryManager getMemoryManager() {
+   return null;
+   }
+
+   @Override
+   public String getTaskName() {
+   return null;
+   }
+

[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695410#comment-14695410
 ] 

ASF GitHub Bot commented on FLINK-2480:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r36990759
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
+   public MemoryManager getMemoryManager() {
+   return null;
+   }
+
+   @Override
+   public String getTaskName() {
+   return null;
+   }
+

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130672880
  
> Otherwise, I found the SocketClientSink didn`t have the "retry".
Is it necessary to get a "retry"?

Yes, that might be an issue but let's keep it separate from our concern 
here. If you want, you can open a JIRA issue for the missing retry option in 
the `SocketClientSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130663511
  
Ok, I add two cases(retry 10 and 0) since I thought retry 1 time just same 
as 10.
And would you please take a look with another two 
tests(https://github.com/apache/flink/pull/991   and  
https://github.com/apache/flink/pull/977)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36969808
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -1057,7 +1061,68 @@ public long count() throws Exception {
public UnionOperator union(DataSet other){
return new UnionOperator(this, other, 
Utils.getCallLocationName());
}
+
+   // 

+   //  Sample
+   // 

+   
+   /**
+* Generate a sample of DataSet by the probability fraction of each 
element.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param fractionProbability that each element is chosen, 
should be [0,1] without replacement,
+*and [0, ∞) with replacement. While fraction 
is larger than 1, the elements are 
+*expected to be selected multi times into 
sample on average.
+* @return The sampled DataSet
+*/
+   public MapPartitionOperator sample(final boolean withReplacement, 
final double fraction) {
+   return sample(withReplacement, fraction, Utils.RNG.nextLong());
+   }
+   
+   /**
+* Generate a sample of DataSet by the probability fraction of each 
element.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param fractionProbability that each element is chosen, 
should be [0,1] without replacement,
+*and [0, ∞) with replacement. While fraction 
is larger than 1, the elements are 
+*expected to be selected multi times into 
sample on average.
+* @param seedrandom number generator seed.
+* @return The sampled DataSet
+*/
+   public MapPartitionOperator sample(final boolean withReplacement, 
final double fraction, final long seed) {
+   return mapPartition(new SampleWithFraction(withReplacement, 
fraction, seed));
+   }
+   
+   /**
+* Generate a sample of DataSet which contains fixed size elements.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param numSample   The expected sample size.
+* @return The sampled DataSet
+*/
--- End diff --

Maybe we want to include a note that this kind of sampling currently takes 
2 passes over the data, and recommend using fraction unless exact precision is 
necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130664351
  
@mxm 
Otherwise, I found the SocketClientSink didn`t have the "retry".
Is it necessary to get a "retry"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2515) CheckpointCoordinator triggers checkpoints even if not all sources are running any more

2015-08-13 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2515:
---

 Summary: CheckpointCoordinator triggers checkpoints even if not 
all sources are running any more
 Key: FLINK-2515
 URL: https://issues.apache.org/jira/browse/FLINK-2515
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 0.10
Reporter: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


When some sources finish early, they will not emit checkpoint barriers any 
more. That means that pending checkpoint alignments will never be able to 
complete, locking the flow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36955412
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
--- End diff --

Do you mean Bernouli _trial_ here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2306) Add support for named streams in Storm compatibility layer

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695292#comment-14695292
 ] 

ASF GitHub Bot commented on FLINK-2306:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1011#issuecomment-130694237
  
Yes. It a weird compile error and it builds locally... The error is:
`[ERROR] 
/home/travis/build/mjsax/flink/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java:[27,64]
 package org.apache.flink.hadoop.shaded.com.google.common.collect does not 
exist`

I have no clue what's wrong... :/ Any suggestions?


> Add support for named streams in Storm compatibility layer
> --
>
> Key: FLINK-2306
> URL: https://issues.apache.org/jira/browse/FLINK-2306
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> Currently, the layer only works on single stream and ignores stream names, 
> ie, each stream is treated as "default" stream. The declaration of multiple 
> output streams is ignored (all tuples are emitted to the same stream). If 
> multiple input streams are consumed all tuples are merged into a single 
> stream.
> This feature allows operators to declare multiple (named) output streams and 
> emit tuples to different stream. Furthermore, it enables Bolts to distinguish 
> incoming tuples from different streams by stream name (Storm tuple meta 
> information).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130644950
  
`StringBuilder` is only for single-threaded while `StringBuffer` enables 
multi-thread access. If you use `StringBuffer` in a single-threaded scenario it 
has worse performance than `StringBuilder`.

Thanks for you changes. In addition to the "infinity" test, can you add a 
test that checks for a certain number of retries (e.g. 10)? Also please add a 
check for 1 and 0 retries. It's always good to test corner cases :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130626843
  
Hi Max,
I fixed all as your reviews.
And I retained the change of StringBuffer to StringBuilder.
There is a question that as I see the StringBuilder just do the same thing 
as StringBuffer currently.
So what`s the real different the two type in the SocketTextStreamFunction?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2306) Add support for named streams in Storm compatibility layer

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695280#comment-14695280
 ] 

ASF GitHub Bot commented on FLINK-2306:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1011#issuecomment-130692637
  
The CU reports 3 failures.

2 failures in storm-compatibility-core
1 failure in YARN (yarn not responding)



> Add support for named streams in Storm compatibility layer
> --
>
> Key: FLINK-2306
> URL: https://issues.apache.org/jira/browse/FLINK-2306
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> Currently, the layer only works on single stream and ignores stream names, 
> ie, each stream is treated as "default" stream. The declaration of multiple 
> output streams is ignored (all tuples are emitted to the same stream). If 
> multiple input streams are consumed all tuples are merged into a single 
> stream.
> This feature allows operators to declare multiple (named) output streams and 
> emit tuples to different stream. Furthermore, it enables Bolts to distinguish 
> incoming tuples from different streams by stream name (Storm tuple meta 
> information).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130524804
  
@mxm 
Hi, I fixed the StringBuffer and add the test.
Take a look whether it`s correct.
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...

2015-08-13 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/883#issuecomment-130623223
  
I've actually included this in #1000. Could you close this PR @samk3211? 
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...

2015-08-13 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1000#discussion_r36963490
  
--- Diff: 
flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
 ---
@@ -86,26 +72,35 @@ public void testConnectedComponents() throws Exception {
 
@Test
public void testSingleSourceShortestPaths() throws Exception {
-   GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, 
resultPath, "16"});
-   expectedResult = "1 0.0\n" +
-   "2 12.0\n" +
-   "3 13.0\n" +
-   "4 47.0\n" +
-   "5 48.0\n" +
-   "6 Infinity\n" +
-   "7 Infinity\n";
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   Graph inputGraph = Graph.fromDataSet(
+   
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+   new InitMapperSSSP(), env);
+
+List> result = inputGraph.run(new 
GSASingleSourceShortestPaths(1l, 16))
+   .getVertices().collect();
--- End diff --

Yes, we do! The idea is to lazily mitigate the rest of the tests, too. Take 
a look at FLINK-2032.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...

2015-08-13 Thread kno10
Github user kno10 commented on the pull request:

https://github.com/apache/flink/pull/696#issuecomment-130469648
  
R-trees are hard to parallelize.
For distributed and gigabyte size data, an approximative approach is 
preferable, like the one we discuss in this article:

E. Schubert, A. Zimek, H.-P. Kriegel
Fast and Scalable Outlier Detection with Approximate Nearest Neighbor 
Ensembles
In Proceedings of the 20th International Conference on Database Systems for 
Advanced Applications (DASFAA), Hanoi, Vietnam: 19–36, 2015. 

We discuss an approach that is easy to parallelize. It needs sorting and a 
sliding window (or blocks), so it is not strict MapReduce, but it should be a 
good match for Flink. The hardest part is to get the different space filling 
curves right and efficient. The other components (random projections to reduce 
dimensionality, ensemble to improve quality, and list inversions to also build 
reverse kNN that then allow accelerating methods such as LOF are much easier).

The main drawback of most of these kNN-join approaches (including ours) is 
that they only work with Minkowski norms. There are much more interesting 
distance functions than that...

We also discuss why the space filling curves appear to give better results 
for kNN, while LSH etc. work better for radius joins. LSH is another option, 
but it cannot guarantee to find k neighbors and parameter tuning is tricky. So 
you may want to have a look at this recent ensemble approach instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [CLEANUP] Add space between quotes and plus si...

2015-08-13 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1010#issuecomment-130633384
  
Thanks Henry, please add this rule. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2077) Rework Path class and add extend support for Windows paths

2015-08-13 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695236#comment-14695236
 ] 

Fabian Hueske commented on FLINK-2077:
--

Hi Lun,

{{//host/dir1/dir2}} is an example for a path to a Windows share, i.e., a path 
to a directory {{dir1/dir2}} which is shared by a host {{host}}. You can check 
the [Windows Dev 
Center|https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247%28v=vs.85%29.aspx#paths]
 for Windows path specification.

However, the focus of this JIRA would be rather to clean up the {{Path}} class 
which exists for a long time and has been changed at several places. Now it is 
a bit messed up and hard to maintain.
Adding support for Windows share paths would be nice but is not mandatory.

Thanks, Fabian

> Rework Path class and add extend support for Windows paths
> --
>
> Key: FLINK-2077
> URL: https://issues.apache.org/jira/browse/FLINK-2077
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>  Labels: starter
>
> The class {{org.apache.flink.core.fs.Path}} handles paths for Flink's 
> {{FileInputFormat}} and {{FileOutputFormat}}. Over time, this class has 
> become quite hard to read and modify. 
> It would benefit from some cleaning and refactoring. Along with the 
> refactoring, support for Windows paths like {{//host/dir1/dir2}} could be 
> added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats

2015-08-13 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-1819.
-
   Resolution: Implemented
Fix Version/s: (was: 0.9)
   0.10

> Allow access to RuntimeContext from Input and OutputFormats
> ---
>
> Key: FLINK-1819
> URL: https://issues.apache.org/jira/browse/FLINK-1819
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 0.9, 0.8.1
>Reporter: Fabian Hueske
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 0.10
>
>
> User function that extend a RichFunction can access a {{RuntimeContext}} 
> which gives the parallel id of the task and access to Accumulators and 
> BroadcastVariables. 
> Right now, Input and OutputFormats cannot access their {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2508) Confusing sharing of StreamExecutionEnvironment

2015-08-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi reassigned FLINK-2508:
-

Assignee: Márton Balassi

> Confusing sharing of StreamExecutionEnvironment
> ---
>
> Key: FLINK-2508
> URL: https://issues.apache.org/jira/browse/FLINK-2508
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Márton Balassi
> Fix For: 0.10
>
>
> In the {{StreamExecutionEnvironment}}, the environment is once created and 
> then shared with a static variable to all successive calls to 
> {{getExecutionEnvironment()}}. But it can be overridden by calls to 
> {{createLocalEnvironment()}} and {{createRemoteEnvironment()}}.
> This seems a bit un-intuitive, and probably creates confusion when 
> dispatching multiple streaming jobs from within the same JVM.
> Why is it even necessary to cache the "current" execution environment?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695231#comment-14695231
 ] 

ASF GitHub Bot commented on FLINK-1819:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/966


> Allow access to RuntimeContext from Input and OutputFormats
> ---
>
> Key: FLINK-1819
> URL: https://issues.apache.org/jira/browse/FLINK-1819
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 0.9, 0.8.1
>Reporter: Fabian Hueske
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 0.9
>
>
> User function that extend a RichFunction can access a {{RuntimeContext}} 
> which gives the parallel id of the task and access to Accumulators and 
> BroadcastVariables. 
> Right now, Input and OutputFormats cannot access their {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.

2015-08-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi reassigned FLINK-2514:
-

Assignee: Márton Balassi

> Local and Remote environment behave differently when re-triggering execution.
> -
>
> Key: FLINK-2514
> URL: https://issues.apache.org/jira/browse/FLINK-2514
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Márton Balassi
>Priority: Critical
> Fix For: 0.10
>
>
> The following code behaves differently on the {{LocalStreamEnvironment}} and 
> the {{RemoteStreamEnvironment}}.
> {code}
> StreamExecutionEnvironment env = ...;
> env.addSource(someSource).addSink(someSink);
> env.execute();
> env.addSource(anotherSource).addSink(anotherSink);
> env.execute();
> {code}
> Locally, only the second source/sink pair is executed.
> Remotely, both are re-executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695232#comment-14695232
 ] 

ASF GitHub Bot commented on FLINK-1819:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/966#issuecomment-130677380
  
Thanks for the contribution @sachingoel0101!


> Allow access to RuntimeContext from Input and OutputFormats
> ---
>
> Key: FLINK-1819
> URL: https://issues.apache.org/jira/browse/FLINK-1819
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 0.9, 0.8.1
>Reporter: Fabian Hueske
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 0.9
>
>
> User function that extend a RichFunction can access a {{RuntimeContext}} 
> which gives the parallel id of the task and access to Accumulators and 
> BroadcastVariables. 
> Right now, Input and OutputFormats cannot access their {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.

2015-08-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi updated FLINK-2514:
--
Fix Version/s: 0.10

> Local and Remote environment behave differently when re-triggering execution.
> -
>
> Key: FLINK-2514
> URL: https://issues.apache.org/jira/browse/FLINK-2514
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Priority: Critical
> Fix For: 0.10
>
>
> The following code behaves differently on the {{LocalStreamEnvironment}} and 
> the {{RemoteStreamEnvironment}}.
> {code}
> StreamExecutionEnvironment env = ...;
> env.addSource(someSource).addSink(someSink);
> env.execute();
> env.addSource(anotherSource).addSink(anotherSink);
> env.execute();
> {code}
> Locally, only the second source/sink pair is executed.
> Remotely, both are re-executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695215#comment-14695215
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130672880
  
> Otherwise, I found the SocketClientSink didn`t have the "retry".
Is it necessary to get a "retry"?

Yes, that might be an issue but let's keep it separate from our concern 
here. If you want, you can open a JIRA issue for the missing retry option in 
the `SocketClientSink`.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130588652
  
Thanks for your changes. I think we should use `read()` instead of 
`readLine()` because we are using a custom delimiter and not necessarily "\n" 
(newline symbol). The danger of reading an entire line is that the newline 
symbol might never arrive. So it might continue to read forever. And even if it 
manages to find a newline symbol, you have to truncate your input to find the 
custom delimiter. That's not very efficient. Can you change the code back to 
using the `read()` method? I think we had a misunderstanding.

For you test case: It's not considered good practice to mix production and 
test code. You're doing that by introducing the `isRetrying` flag and exposing 
it. Alternatively, you have two options:

1. Create a `ServerSocket` and pass its address to the 
`SocketTextStreamFunction`. Then control the connection to this socket and 
count how often the function reconnects (e.g. use the `accept()` method).
2. Create your test in the same package as the `SocketTextStreamFunction` 
function (package is `org.apache.flink.streaming.api.functions.source`). Then 
you can access all field variables which are protected. So make your `retries` 
variable a protected field variable of the `SocketTextStreamFunction` class.

I hope that this helps you. If not, feel free to ask more questions.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695213#comment-14695213
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973430
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --

You should also initialize the variable with 0 in the `open()` method.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695210#comment-14695210
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973254
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   

[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36957769
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List => JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+verifySamplerWithFractionWithoutReplacement(0d)
+verifySamplerWithFractionWithoutReplacement(0.2d)
+verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+verifySamplerWithFractionWithReplacement(0d)
+verifySamplerWithFractionWithReplacement(0.2d)
+verifySamplerWithFractionWithReplacement(1.0d)
+verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+verifySamplerWithFixedSizeWithoutReplacement(0)
+verifySamplerWithFixedSizeWithoutReplacement(2)
+verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+verifySamplerWithFixedSizeWithReplacement(0)
+verifySamplerWithFixedSizeWithReplacement(2)
+verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double) {
+verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double, seed: Long) {
+verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, 
seed: Long) {
+verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, 
fraction: Double, seed: Long) {
+val ds = getSourceDataSet()
+val sampled = ds.sample(withReplacement, fraction, seed)
+result = sampled.collect.asJava
--- End diff --

Is this result checked for validity somewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695212#comment-14695212
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973377
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -43,6 +43,7 @@
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retrys;
--- End diff --

Sorry, minor thing but it should be `retries`.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695211#comment-14695211
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36973279
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   

[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695196#comment-14695196
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130664351
  
@mxm 
Otherwise, I found the SocketClientSink didn`t have the "retry".
Is it necessary to get a "retry"?


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955334
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -145,4 +152,8 @@ public void cancel() {
}
}
}
+
+   public boolean getIsRetrying() {
--- End diff --

Please remove this getter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695192#comment-14695192
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130663511
  
Ok, I add two cases(retry 10 and 0) since I thought retry 1 time just same 
as 10.
And would you please take a look with another two 
tests(https://github.com/apache/flink/pull/991   and  
https://github.com/apache/flink/pull/977)?


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36958345
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
+ *
+ * @param  The type of sample.
+ */
+public class BernoulliSampler extends RandomSampler {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+*/
+   public BernoulliSampler(double fraction) {
+   this(fraction, new Random());
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator seed.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+* @param seed Random number generator seed.
+*/
+   public BernoulliSampler(double fraction, long seed) {
+   this(fraction, new Random(seed));
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+* @param random   The random number generator.
+*/
+   public BernoulliSampler(double fraction, Random random) {
+   Preconditions.checkArgument(fraction >= 0 && fraction <= 1.0d, 
"fraction fraction must between [0, 1].");
+   this.fraction = fraction;
+   this.random = random;
+   }
+   
+   /**
+* Sample the input elements, for each input element, take a Bernoulli 
Trail for sample.
+*
+* @param input Elements to be sampled.
+* @return The sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public Iterator sample(final Iterator input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIterator() {
+   T current;
+   
+   @Override
+   public boolean hasNext() {
+   if (current == null) {
+   while (input.hasNext()) {
+   T element = input.next();
+   if (random.nextDouble() <= 
fraction) {
+   current = element;
+   return true;
+   }
+   }
+   current = null;
+   return false;
+   } else {
+   return true;
+   }
+   }
+   
+   @Override
+   public T next() {
--- End diff --

It feels a bit counterintuitive that the next element is prepared in the 
`hasNext()` function. Doesn't this mean that `hasNext()` **needs** to be called 
every time before we call `next()`?

Can we protect against that case where we would get a `null` element back 
that way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this fea

[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36950634
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1182,6 +1184,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 getCallLocationName()))
 
   // 

+  //  Sample
+  // 

+  /**
+   * Generate a sample of DataSet by the probability fraction of each 
element.
+   *
+   * @param withReplacement Whether element can be selected more than once.
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average.
+   * @param seedRandom number generator seed.
+   * @return The sampled DataSet
+   */
+  def sample(
+  withReplacement: Boolean,
+  fraction: Double,
+  seed: Long = Utils.RNG.nextLong()): DataSet[T] = {
+
+wrap(new MapPartitionOperator[T, T](javaSet,
+  getType(),
+  new SampleWithFraction(withReplacement, fraction, seed),
+  getCallLocationName()))
+  }
+
+  /**
+   * Generate a sample of DataSet by the probability fraction of each 
element.
--- End diff --

Javadoc is from the fraction function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955221
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -69,14 +68,14 @@ public void run(SourceContext ctx) throws 
Exception {
 
public void streamFromSocket(SourceContext ctx, Socket socket) 
throws Exception {
--- End diff --

I think this method should be private because it is not meant to be used 
outside this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695166#comment-14695166
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36969808
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -1057,7 +1061,68 @@ public long count() throws Exception {
public UnionOperator union(DataSet other){
return new UnionOperator(this, other, 
Utils.getCallLocationName());
}
+
+   // 

+   //  Sample
+   // 

+   
+   /**
+* Generate a sample of DataSet by the probability fraction of each 
element.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param fractionProbability that each element is chosen, 
should be [0,1] without replacement,
+*and [0, ∞) with replacement. While fraction 
is larger than 1, the elements are 
+*expected to be selected multi times into 
sample on average.
+* @return The sampled DataSet
+*/
+   public MapPartitionOperator sample(final boolean withReplacement, 
final double fraction) {
+   return sample(withReplacement, fraction, Utils.RNG.nextLong());
+   }
+   
+   /**
+* Generate a sample of DataSet by the probability fraction of each 
element.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param fractionProbability that each element is chosen, 
should be [0,1] without replacement,
+*and [0, ∞) with replacement. While fraction 
is larger than 1, the elements are 
+*expected to be selected multi times into 
sample on average.
+* @param seedrandom number generator seed.
+* @return The sampled DataSet
+*/
+   public MapPartitionOperator sample(final boolean withReplacement, 
final double fraction, final long seed) {
+   return mapPartition(new SampleWithFraction(withReplacement, 
fraction, seed));
+   }
+   
+   /**
+* Generate a sample of DataSet which contains fixed size elements.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param numSample   The expected sample size.
+* @return The sampled DataSet
+*/
--- End diff --

Maybe we want to include a note that this kind of sampling currently takes 
2 passes over the data, and recommend using fraction unless exact precision is 
necessary.


> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative or exact size of the sample, set a seed for 
> reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.

2015-08-13 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2514:
---

 Summary: Local and Remote environment behave differently when 
re-triggering execution.
 Key: FLINK-2514
 URL: https://issues.apache.org/jira/browse/FLINK-2514
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen


The following code behaves differently on the {{LocalStreamEnvironment}} and 
the {{RemoteStreamEnvironment}}.

{code}
StreamExecutionEnvironment env = ...;

env.addSource(someSource).addSink(someSink);
env.execute();

env.addSource(anotherSource).addSink(anotherSink);
env.execute();
{code}

Locally, only the second source/sink pair is executed.
Remotely, both are re-executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.

2015-08-13 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-2514:

Priority: Critical  (was: Major)

> Local and Remote environment behave differently when re-triggering execution.
> -
>
> Key: FLINK-2514
> URL: https://issues.apache.org/jira/browse/FLINK-2514
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Priority: Critical
>
> The following code behaves differently on the {{LocalStreamEnvironment}} and 
> the {{RemoteStreamEnvironment}}.
> {code}
> StreamExecutionEnvironment env = ...;
> env.addSource(someSource).addSink(someSink);
> env.execute();
> env.addSource(anotherSource).addSink(anotherSink);
> env.execute();
> {code}
> Locally, only the second source/sink pair is executed.
> Remotely, both are re-executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2512]Add client.close() before throw Ru...

2015-08-13 Thread ffbin
Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1009#issuecomment-130509029
  
@uce @hsaputra  Thanks. I have move the try up and rely on finally to close 
the client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695133#comment-14695133
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130644950
  
`StringBuilder` is only for single-threaded while `StringBuffer` enables 
multi-thread access. If you use `StringBuffer` in a single-threaded scenario it 
has worse performance than `StringBuilder`.

Thanks for you changes. In addition to the "infinity" test, can you add a 
test that checks for a certain number of retries (e.g. 10)? Also please add a 
check for 1 and 0 retries. It's always good to test corner cases :)


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...

2015-08-13 Thread samk3211
Github user samk3211 commented on the pull request:

https://github.com/apache/flink/pull/883#issuecomment-130632285
  
closed



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...

2015-08-13 Thread samk3211
Github user samk3211 closed the pull request at:

https://github.com/apache/flink/pull/883


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2513) Extend state handle provider interface to list all state handles

2015-08-13 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2513:
--

 Summary: Extend state handle provider interface to list all state 
handles
 Key: FLINK-2513
 URL: https://issues.apache.org/jira/browse/FLINK-2513
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Ufuk Celebi


This is a follow up issue to FLINK-2354.

In FLINK-2354 we use ZooKeeper to persist state handles. In certain failure 
scenarios, there can be lingering state handles, which have not been written to 
ZooKeeper, but which were already created.

These can be cleaned up on startup, if the state handle provider implementation 
kept track of their state handles. With the current implementations it would be 
possible, e.g. list the directory (file system) of no state is persistent (job 
manager).

It would be fair enough have this as an optional operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36960527
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
@@ -451,6 +454,53 @@ protected static File asFile(String path) {
assertEquals(extectedStrings[i], resultStrings[i]);
}
}
+   
+   // 

+   // Comparison methods for tests using sample
+   // 

+   
+   public static  void containsResultAsTuples(List result, String 
expected) {
+   isExpectedContainsResult(result, expected, true);
+   }
+   
+   public static  void containsResultAsText(List result, String 
expected) {
+   isExpectedContainsResult(result, expected, false);
+   }
+   
+   private static  void isExpectedContainsResult(List result, String 
expected, boolean asTuple) {
--- End diff --

Can we get comments explaining the functionality of this and 
`containsResultAsText`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955189
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -40,10 +37,12 @@
private char delimiter;
private long maxRetry;
private boolean retryForever;
+   private boolean isRetrying = false;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
 
+   private volatile boolean isExit = false;
--- End diff --

Is this flag necessary? We have `isRunning` already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2451) Cleanup Gelly examples

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695067#comment-14695067
 ] 

ASF GitHub Bot commented on FLINK-2451:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1000#discussion_r36963490
  
--- Diff: 
flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
 ---
@@ -86,26 +72,35 @@ public void testConnectedComponents() throws Exception {
 
@Test
public void testSingleSourceShortestPaths() throws Exception {
-   GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, 
resultPath, "16"});
-   expectedResult = "1 0.0\n" +
-   "2 12.0\n" +
-   "3 13.0\n" +
-   "4 47.0\n" +
-   "5 48.0\n" +
-   "6 Infinity\n" +
-   "7 Infinity\n";
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   Graph inputGraph = Graph.fromDataSet(
+   
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+   new InitMapperSSSP(), env);
+
+List> result = inputGraph.run(new 
GSASingleSourceShortestPaths(1l, 16))
+   .getVertices().collect();
--- End diff --

Yes, we do! The idea is to lazily mitigate the rest of the tests, too. Take 
a look at FLINK-2032.


> Cleanup Gelly examples
> --
>
> Key: FLINK-2451
> URL: https://issues.apache.org/jira/browse/FLINK-2451
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 0.10
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>Priority: Minor
>
> As per discussion in the dev@ mailing list, this issue proposes the following 
> changes to the Gelly examples and library:
> 1. Keep the following examples as they are:
> EuclideanGraphWeighing, GraphMetrics, IncrementalSSSP, JaccardSimilarity,  
> MusicProfiles.
> 2. Keep only 1 example to show how to use library methods.
> 3. Add 1 example for vertex-centric iterations.
> 4. Keep 1 example for GSA iterations and move the redundant GSA 
> implementations to the library.
> 5. Improve the examples documentation and refer to the functionality that 
> each of them demonstrates.
> 6. Port and modify existing example tests accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695069#comment-14695069
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130626843
  
Hi Max,
I fixed all as your reviews.
And I retained the change of StringBuffer to StringBuilder.
There is a question that as I see the StringBuilder just do the same thing 
as StringBuffer currently.
So what`s the real different the two type in the SocketTextStreamFunction?


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36960080
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
@@ -451,6 +454,53 @@ protected static File asFile(String path) {
assertEquals(extectedStrings[i], resultStrings[i]);
}
}
+   
+   // 

+   // Comparison methods for tests using sample
+   // 

+   
+   public static  void containsResultAsTuples(List result, String 
expected) {
--- End diff --

Is this used anywhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695034#comment-14695034
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36960527
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
@@ -451,6 +454,53 @@ protected static File asFile(String path) {
assertEquals(extectedStrings[i], resultStrings[i]);
}
}
+   
+   // 

+   // Comparison methods for tests using sample
+   // 

+   
+   public static  void containsResultAsTuples(List result, String 
expected) {
+   isExpectedContainsResult(result, expected, true);
+   }
+   
+   public static  void containsResultAsText(List result, String 
expected) {
+   isExpectedContainsResult(result, expected, false);
+   }
+   
+   private static  void isExpectedContainsResult(List result, String 
expected, boolean asTuple) {
--- End diff --

Can we get comments explaining the functionality of this and 
`containsResultAsText`?


> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative or exact size of the sample, set a seed for 
> reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695031#comment-14695031
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36960080
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
@@ -451,6 +454,53 @@ protected static File asFile(String path) {
assertEquals(extectedStrings[i], resultStrings[i]);
}
}
+   
+   // 

+   // Comparison methods for tests using sample
+   // 

+   
+   public static  void containsResultAsTuples(List result, String 
expected) {
--- End diff --

Is this used anywhere?


> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative or exact size of the sample, set a seed for 
> reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695003#comment-14695003
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36958345
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
+ *
+ * @param  The type of sample.
+ */
+public class BernoulliSampler extends RandomSampler {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+*/
+   public BernoulliSampler(double fraction) {
+   this(fraction, new Random());
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator seed.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+* @param seed Random number generator seed.
+*/
+   public BernoulliSampler(double fraction, long seed) {
+   this(fraction, new Random(seed));
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+* @param random   The random number generator.
+*/
+   public BernoulliSampler(double fraction, Random random) {
+   Preconditions.checkArgument(fraction >= 0 && fraction <= 1.0d, 
"fraction fraction must between [0, 1].");
+   this.fraction = fraction;
+   this.random = random;
+   }
+   
+   /**
+* Sample the input elements, for each input element, take a Bernoulli 
Trail for sample.
+*
+* @param input Elements to be sampled.
+* @return The sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public Iterator sample(final Iterator input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIterator() {
+   T current;
+   
+   @Override
+   public boolean hasNext() {
+   if (current == null) {
+   while (input.hasNext()) {
+   T element = input.next();
+   if (random.nextDouble() <= 
fraction) {
+   current = element;
+   return true;
+   }
+   }
+   current = null;
+   return false;
+   } else {
+   return true;
+   }
+   }
+   
+   @Override
+   public T next() {
--- End diff --

It feels a bit counterintuitive that the next element is prepared in the 
`hasNext()` function. Doesn't this mean that `hasNext()` **needs** to be called 
every time before we call 

[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...

2015-08-13 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/696#issuecomment-130574663
  
@kno10 thanks for the pointer. Judging from the abstract it sounds really 
promising and it seems definitely worth exploring. Unfortunately, I don't have 
access to the complete paper which makes it hard to fully understand the 
algorithm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2373) Add configuration parameter to createRemoteEnvironment method

2015-08-13 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694989#comment-14694989
 ] 

Ufuk Celebi commented on FLINK-2373:


Andreas, did you have a look at this? I have a small change as part of an 
upcoming pull request to allow this as well. I will keep it simple and just add:

{code}
createRemoteEnvironment(Configuration flinkConfig, String... jarFiles)
{code}

It would be nice to think about the "whole" execution environment stack and 
have it more consistent.

> Add configuration parameter to createRemoteEnvironment method
> -
>
> Key: FLINK-2373
> URL: https://issues.apache.org/jira/browse/FLINK-2373
> Project: Flink
>  Issue Type: Bug
>  Components: other
>Reporter: Andreas Kunft
>Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently there is no way to provide a custom configuration upon creation of 
> a remote environment (via ExecutionEnvironment.createRemoteEnvironment(...)).
> This leads to errors when the submitted job exceeds the default value for the 
> max. payload size in Akka, as we can not increase the configuration value 
> (akka.remote.OversizedPayloadException: Discarding oversized payload...)
> Providing an overloaded method with a configuration parameter for the remote 
> environment fixes that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694981#comment-14694981
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36957769
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List => JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+verifySamplerWithFractionWithoutReplacement(0d)
+verifySamplerWithFractionWithoutReplacement(0.2d)
+verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+verifySamplerWithFractionWithReplacement(0d)
+verifySamplerWithFractionWithReplacement(0.2d)
+verifySamplerWithFractionWithReplacement(1.0d)
+verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+verifySamplerWithFixedSizeWithoutReplacement(0)
+verifySamplerWithFixedSizeWithoutReplacement(2)
+verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+verifySamplerWithFixedSizeWithReplacement(0)
+verifySamplerWithFixedSizeWithReplacement(2)
+verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double) {
+verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double, seed: Long) {
+verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, 
seed: Long) {
+verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, 
fraction: Double, seed: Long) {
+val ds = getSourceDataSet()
+val sampled = ds.sample(withReplacement, fraction, seed)
+result = sampled.collect.asJava
--- End diff --

Is this result checked for validity somewhere?


> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.a

[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694974#comment-14694974
 ] 

ASF GitHub Bot commented on FLINK-1819:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/966#issuecomment-130595336
  
Thank you for your remarks @StephanEwen.

>I think you should go ahead and just call them "Rich". It is just a name, 
and what matters is that the JavaDocs describe what it actually means...

The JavaDocs in this pull request properly describe what a 
"RichInputFormat" is. So we are good to go. Let's stick with "Rich" although I 
understand your concerns.

I'll merge this later if nobody objects in the meantime :)


> Allow access to RuntimeContext from Input and OutputFormats
> ---
>
> Key: FLINK-1819
> URL: https://issues.apache.org/jira/browse/FLINK-1819
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 0.9, 0.8.1
>Reporter: Fabian Hueske
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 0.9
>
>
> User function that extend a RichFunction can access a {{RuntimeContext}} 
> which gives the parallel id of the task and access to Accumulators and 
> BroadcastVariables. 
> Right now, Input and OutputFormats cannot access their {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36955515
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
+ *
+ * @param  The type of sample.
+ */
+public class BernoulliSampler extends RandomSampler {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
--- End diff --

*B*ernouli should be capitalized for all mentions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955316
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -85,11 +84,12 @@ public void streamFromSocket(SourceContext ctx, 
Socket socket) throws Ex
}
}
 
-   if (data == -1) {
+   if (data == null) {
socket.close();
long retry = 0;
boolean success = false;
-   while (retry < maxRetry && !success) {
+   while ((retry < maxRetry || 
(retryForever && !isExit)) && !success) {
+   isRetrying = true;
--- End diff --

This flag is only necessary for your test and thus should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694943#comment-14694943
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36955515
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
+ *
+ * @param  The type of sample.
+ */
+public class BernoulliSampler extends RandomSampler {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
--- End diff --

*B*ernouli should be capitalized for all mentions


> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative or exact size of the sample, set a seed for 
> reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694940#comment-14694940
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36955412
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
--- End diff --

Do you mean Bernouli _trial_ here?


> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative or exact size of the sample, set a seed for 
> reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694938#comment-14694938
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955316
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -85,11 +84,12 @@ public void streamFromSocket(SourceContext ctx, 
Socket socket) throws Ex
}
}
 
-   if (data == -1) {
+   if (data == null) {
socket.close();
long retry = 0;
boolean success = false;
-   while (retry < maxRetry && !success) {
+   while ((retry < maxRetry || 
(retryForever && !isExit)) && !success) {
+   isRetrying = true;
--- End diff --

This flag is only necessary for your test and thus should be removed.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694939#comment-14694939
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955334
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -145,4 +152,8 @@ public void cancel() {
}
}
}
+
+   public boolean getIsRetrying() {
--- End diff --

Please remove this getter.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955259
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -69,14 +68,14 @@ public void run(SourceContext ctx) throws 
Exception {
 
public void streamFromSocket(SourceContext ctx, Socket socket) 
throws Exception {
try {
-   StringBuffer buffer = new StringBuffer();
+   StringBuilder buffer = new StringBuilder();
BufferedReader reader = new BufferedReader(new 
InputStreamReader(
socket.getInputStream()));
 
while (isRunning) {
-   int data;
+   String data;
try {
-   data = reader.read();
+   data = reader.readLine();
--- End diff --

Please use `read()` because of the custom delimiter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694935#comment-14694935
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955221
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -69,14 +68,14 @@ public void run(SourceContext ctx) throws 
Exception {
 
public void streamFromSocket(SourceContext ctx, Socket socket) 
throws Exception {
--- End diff --

I think this method should be private because it is not meant to be used 
outside this class.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694936#comment-14694936
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955259
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -69,14 +68,14 @@ public void run(SourceContext ctx) throws 
Exception {
 
public void streamFromSocket(SourceContext ctx, Socket socket) 
throws Exception {
try {
-   StringBuffer buffer = new StringBuffer();
+   StringBuilder buffer = new StringBuilder();
BufferedReader reader = new BufferedReader(new 
InputStreamReader(
socket.getInputStream()));
 
while (isRunning) {
-   int data;
+   String data;
try {
-   data = reader.read();
+   data = reader.readLine();
--- End diff --

Please use `read()` because of the custom delimiter.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694932#comment-14694932
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-130588652
  
Thanks for your changes. I think we should use `read()` instead of 
`readLine()` because we are using a custom delimiter and not necessarily "\n" 
(newline symbol). The danger of reading an entire line is that the newline 
symbol might never arrive. So it might continue to read forever. And even if it 
manages to find a newline symbol, you have to truncate your input to find the 
custom delimiter. That's not very efficient. Can you change the code back to 
using the `read()` method? I think we had a misunderstanding.

For you test case: It's not considered good practice to mix production and 
test code. You're doing that by introducing the `isRetrying` flag and exposing 
it. Alternatively, you have two options:

1. Create a `ServerSocket` and pass its address to the 
`SocketTextStreamFunction`. Then control the connection to this socket and 
count how often the function reconnects (e.g. use the `accept()` method).
2. Create your test in the same package as the `SocketTextStreamFunction` 
function (package is `org.apache.flink.streaming.api.functions.source`). Then 
you can access all field variables which are protected. So make your `retries` 
variable a protected field variable of the `SocketTextStreamFunction` class.

I hope that this helps you. If not, feel free to ask more questions.



> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694934#comment-14694934
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r36955189
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -40,10 +37,12 @@
private char delimiter;
private long maxRetry;
private boolean retryForever;
+   private boolean isRetrying = false;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
 
+   private volatile boolean isExit = false;
--- End diff --

Is this flag necessary? We have `isRunning` already.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2306] Add support for named streams in ...

2015-08-13 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/flink/pull/1011

[FLINK-2306] Add support for named streams in Storm compatibility layer

 - enabled .declareStream() and connect via stream name
 - enabled multiplt output streams
 - added .split() / .select() / strip pattern
 - added helpers in new package utils
 - adapted and extended JUnit tests
 - adapted examples


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/flink flink-2306-storm-namedStreams

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1011.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1011


commit 0b7f47fed18fbe9c9b353960b337e2d8454e11b7
Author: mjsax 
Date:   2015-08-13T06:56:47Z

[FLINK-2306] Add support for named streams in Storm compatibility layer
 - enabled .declareStream() and connect via stream name
 - enabled multiplt output streams
 - added .split() / .select() / strip pattern
 - added helpers in new package utils
 - adapted and extended JUnit tests
 - adapted examples
some minor improvements (FlinkClient, integration of Tuple0)

commit c48ad0f516ee7583f8a6c4e564213fcd614c74e2
Author: mjsax 
Date:   2015-08-12T18:56:58Z

Added split examples
 - example for embedded Spout/Bolt
 - two test-examples for complete topologies
Additionally:
 - updated README.md
 - extended web documentation
 - add comments to pom.xml to explain examples better

commit 71eeb4f2b77b643e0bbc0af4ed9fe212a4a306b7
Author: mjsax 
Date:   2015-08-12T18:57:21Z

TO BE DELETED
 - this PR depends on FLINK-2457 (Integrate Tuple0), ie, PR #983
 - this changes are hotfixes to make the branch work and are not needed 
after PR#983 is merged and this branch is rebased




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694894#comment-14694894
 ] 

ASF GitHub Bot commented on FLINK-1745:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/696#issuecomment-130574663
  
@kno10 thanks for the pointer. Judging from the abstract it sounds really 
promising and it seems definitely worth exploring. Unfortunately, I don't have 
access to the complete paper which makes it hard to fully understand the 
algorithm.


> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694886#comment-14694886
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36950634
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1182,6 +1184,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 getCallLocationName()))
 
   // 

+  //  Sample
+  // 

+  /**
+   * Generate a sample of DataSet by the probability fraction of each 
element.
+   *
+   * @param withReplacement Whether element can be selected more than once.
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average.
+   * @param seedRandom number generator seed.
+   * @return The sampled DataSet
+   */
+  def sample(
+  withReplacement: Boolean,
+  fraction: Double,
+  seed: Long = Utils.RNG.nextLong()): DataSet[T] = {
+
+wrap(new MapPartitionOperator[T, T](javaSet,
+  getType(),
+  new SampleWithFraction(withReplacement, fraction, seed),
+  getCallLocationName()))
+  }
+
+  /**
+   * Generate a sample of DataSet by the probability fraction of each 
element.
--- End diff --

Javadoc is from the fraction function.


> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative or exact size of the sample, set a seed for 
> reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-08-13 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-1745:
-
Assignee: (was: Till Rohrmann)

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2493) Simplify names of example program JARs

2015-08-13 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694854#comment-14694854
 ] 

Stephan Ewen commented on FLINK-2493:
-

Still, seeking consensus on the dev list is a good idea, let's do that!

> Simplify names of example program JARs
> --
>
> Key: FLINK-2493
> URL: https://issues.apache.org/jira/browse/FLINK-2493
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: chenliang
>Priority: Minor
>  Labels: easyfix, starter
>
> I find the names of the example JARs a bit annoying.
> Why not name the file {{examples/ConnectedComponents.jar}} rather than 
> {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}}
> And combine "flink-java-examples" and "flink-scala-examples"  project to one 
> examples project。



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2493) Simplify names of example program JARs

2015-08-13 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694852#comment-14694852
 ] 

Márton Balassi commented on FLINK-2493:
---

Ok, fair enough then.

> Simplify names of example program JARs
> --
>
> Key: FLINK-2493
> URL: https://issues.apache.org/jira/browse/FLINK-2493
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: chenliang
>Priority: Minor
>  Labels: easyfix, starter
>
> I find the names of the example JARs a bit annoying.
> Why not name the file {{examples/ConnectedComponents.jar}} rather than 
> {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}}
> And combine "flink-java-examples" and "flink-scala-examples"  project to one 
> examples project。



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2493) Simplify names of example program JARs

2015-08-13 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14694851#comment-14694851
 ] 

Stephan Ewen commented on FLINK-2493:
-

I think there was a discussion about combining the examples into one project a 
bit back, in the context of also re-organizing the Gelly examples.

I actually like this idea. Will help us reduce the Jungle of projects a bit.

The problems occur mostly when referencing Scala from Java code, and the two 
sets of code are completely separate here. There should be no harm in combining 
the example projects.

> Simplify names of example program JARs
> --
>
> Key: FLINK-2493
> URL: https://issues.apache.org/jira/browse/FLINK-2493
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: chenliang
>Priority: Minor
>  Labels: easyfix, starter
>
> I find the names of the example JARs a bit annoying.
> Why not name the file {{examples/ConnectedComponents.jar}} rather than 
> {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}}
> And combine "flink-java-examples" and "flink-scala-examples"  project to one 
> examples project。



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >