[jira] [Created] (BEAM-816) Aggregators are not properly named when reported to Graphite

2016-10-25 Thread Stas Levin (JIRA)
Stas Levin created BEAM-816:
---

 Summary: Aggregators are not properly named when reported to 
Graphite
 Key: BEAM-816
 URL: https://issues.apache.org/jira/browse/BEAM-816
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Stas Levin
Assignee: Stas Levin


(named) Aggregators are reported at the top level of the Graphite tree, instead 
of being properly indented under the appropriate 
applicationId/[driver|executor]/etc.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-891) Flake in Spark metrics library?

2016-11-03 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633346#comment-15633346
 ] 

Stas Levin commented on BEAM-891:
-

Sure. 
I looked at it a bit this morning, my first impression would be a concurrency 
issue.

I was wondering if you have more details on how this suite is run in terms of 
parallelism?

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.(SparkRuntimeContext.java:66)
>   at 
> org.apache.beam.runners.spark.translation.EvaluationContext.(EvaluationContext.java:73)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-891) Flake in Spark metrics library?

2016-11-09 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650533#comment-15650533
 ] 

Stas Levin commented on BEAM-891:
-

We can indeed try syncing the following block in {{SparkRuntimeContext}}:

{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}

But since other calls to {{MetricsSystem#removeSource}} and 
{{MetricsSystem#registerSource}} are not synced (e.g., from Spark itself), this 
kind of synchronisation will only help if the so called "race" lies in our Beam 
code. If the race is with Spark's internal calls, us syncing the above block 
will not be very helpful.

Another option would be to replace the above block with this one (which 
requires some minor changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).nonEmpty()) {
  metricsSystem.registerSource(aggregatorMetricSource);
}
{code}

Which eliminates {{SparkRuntimeContext}}'s call to 
{{MetricsSystem#removeSource}} altogether.

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem

[jira] [Comment Edited] (BEAM-891) Flake in Spark metrics library?

2016-11-09 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650533#comment-15650533
 ] 

Stas Levin edited comment on BEAM-891 at 11/9/16 12:28 PM:
---

We can indeed try syncing the following block in {{SparkRuntimeContext}}:

{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}

But since other calls to {{MetricsSystem#removeSource}} and 
{{MetricsSystem#registerSource}} are not synced (e.g., from Spark itself), this 
kind of synchronisation will only help if the so called "race" lies in our Beam 
code. If the race is with Spark's internal calls, us syncing the above block 
will not be very helpful.

Another option would be to replace the above block with this one (which 
requires some minor changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).empty()) {
  metricsSystem.registerSource(aggregatorMetricSource);
}
{code}

Which eliminates {{SparkRuntimeContext}}'s call to 
{{MetricsSystem#removeSource}} altogether.


was (Author: staslev):
We can indeed try syncing the following block in {{SparkRuntimeContext}}:

{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}

But since other calls to {{MetricsSystem#removeSource}} and 
{{MetricsSystem#registerSource}} are not synced (e.g., from Spark itself), this 
kind of synchronisation will only help if the so called "race" lies in our Beam 
code. If the race is with Spark's internal calls, us syncing the above block 
will not be very helpful.

Another option would be to replace the above block with this one (which 
requires some minor changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).nonEmpty()) {
  metricsSystem.registerSource(aggregatorMetricSource);
}
{code}

Which eliminates {{SparkRuntimeContext}}'s call to 
{{MetricsSystem#removeSource}} altogether.

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.Resi

[jira] [Commented] (BEAM-891) Flake in Spark metrics library?

2016-11-09 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650807#comment-15650807
 ] 

Stas Levin commented on BEAM-891:
-

Perhaps it would be safe to go the other way around, i.e., "only add this 
source if it's missing". This way we won't have to deal with when it should be 
removed whatsoever.

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.(SparkRuntimeContext.java:66)
>   at 
> org.apache.beam.runners.spark.translation.EvaluationContext.(EvaluationContext.java:73)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-891) Flake in Spark metrics library?

2016-11-13 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15661083#comment-15661083
 ] 

Stas Levin commented on BEAM-891:
-

[~amitsela], as far as the name goes, we can try to make it clearer, say by 
renaming to enableSparkMetricSinks}} (or something).

I do believe {{enableSparkSinks}} should be {{true}} by default, since metrics 
sound useful enough for users to have on by default, unless otherwise 
specified. Moreover, if your {{metrics.properties}} is empty or missing, the 
behaviour should be as if you had {{enableSparkSinks}} set to {{false}} anyway, 
so ideally you would never have to actually set {{enableSparkSinks}} to 
{{false}}. I would be reluctant to remove a piece of useful functionality just 
as a workaround, unless we absolutely must.

Btw, is disabling context reuse something you have considered?


> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>   at 
> org.apache.beam.runners.spark.t

[jira] [Comment Edited] (BEAM-891) Flake in Spark metrics library?

2016-11-13 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15661083#comment-15661083
 ] 

Stas Levin edited comment on BEAM-891 at 11/13/16 8:21 AM:
---

[~amitsela], as far as the name goes, we can try to make it clearer, say by 
renaming to {{enableSparkMetricSinks}} (or something).

I do believe {{enableSparkSinks}} should be {{true}} by default, since metrics 
sound useful enough for users to have on by default, unless otherwise 
specified. Moreover, if your {{metrics.properties}} is empty or missing, the 
behaviour should be as if you had {{enableSparkSinks}} set to {{false}} anyway, 
so ideally you would never have to actually set {{enableSparkSinks}} to 
{{false}}. I would be reluctant to remove a piece of useful functionality just 
as a workaround, unless we absolutely must.

Btw, is disabling context reuse something you have considered?



was (Author: staslev):
[~amitsela], as far as the name goes, we can try to make it clearer, say by 
renaming to enableSparkMetricSinks}} (or something).

I do believe {{enableSparkSinks}} should be {{true}} by default, since metrics 
sound useful enough for users to have on by default, unless otherwise 
specified. Moreover, if your {{metrics.properties}} is empty or missing, the 
behaviour should be as if you had {{enableSparkSinks}} set to {{false}} anyway, 
so ideally you would never have to actually set {{enableSparkSinks}} to 
{{false}}. I would be reluctant to remove a piece of useful functionality just 
as a workaround, unless we absolutely must.

Btw, is disabling context reuse something you have considered?


> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:19

[jira] [Issue Comment Deleted] (BEAM-891) Flake in Spark metrics library?

2016-11-13 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-891:

Comment: was deleted

(was: [~amitsela], as far as the name goes, we can try to make it clearer, say 
by renaming to {{enableSparkMetricSinks}} (or something).

I do believe {{enableSparkSinks}} should be {{true}} by default, since metrics 
sound useful enough for users to have on by default, unless otherwise 
specified. Moreover, if your {{metrics.properties}} is empty or missing, the 
behaviour should be as if you had {{enableSparkSinks}} set to {{false}} anyway, 
so ideally you would never have to actually set {{enableSparkSinks}} to 
{{false}}. I would be reluctant to remove a piece of useful functionality just 
as a workaround, unless we absolutely must.

Btw, is disabling context reuse something you have considered?
)

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeCont

[jira] [Commented] (BEAM-891) Flake in Spark metrics library?

2016-11-13 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15661096#comment-15661096
 ] 

Stas Levin commented on BEAM-891:
-

[~amitsela], as far as the name goes, we can try to make it clearer, say by 
renaming to {{enableSparkMetricSinks}} (or something).

I do believe {{enableSparkSinks}} should be {{true}} by default, since metrics 
sound useful enough for users to have on by default, unless otherwise 
specified. Moreover, if your {{metrics.properties}} is empty or missing, the 
behaviour should be as if you had {{enableSparkSinks}} set to {{false}} anyway, 
so ideally you would never have to actually set {{enableSparkSinks}} to 
{{false}}. I would be reluctant to remove a piece of useful functionality just 
as a workaround, unless we absolutely must.

Btw, is disabling context reuse something you have considered? It has a 
somewhat problematic history in terms of tests.

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext

[jira] [Commented] (BEAM-891) Flake in Spark metrics library?

2016-11-13 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15661227#comment-15661227
 ] 

Stas Levin commented on BEAM-891:
-

Cool, turning it off in relevant tests sounds reasonable.

we'll need to see if there's a common way for tests to obtain 
{{PipelineOptions}}, and if so, make the change there I guess?
Then, it can be overridden in {{ResumeFromCheckpointStreamingTest}}.

What say you?

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.(SparkRuntimeContext.java:66)
>   at 
> org.apache.beam.runners.spark.translation.EvaluationContext.(EvaluationContext.java:73)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-891) Flake in Spark metrics library?

2016-11-14 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663281#comment-15663281
 ] 

Stas Levin edited comment on BEAM-891 at 11/14/16 9:29 AM:
---

Sounds good. 
Btw, why can't we just set {{options.setEnableSparkSinks(false)}} directly in 
{{TestOptionsForStreaming}}?


was (Author: staslev):
Sounds good. 
Btw, why can't we just set {{options.setEnableSparkSinks(false);}} directly in 
{{TestOptionsForStreaming}}?

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.(SparkRuntimeContext.java:66)
>   at 
> org.apache.beam.runners.spark.translation.EvaluationContext.(EvaluationContext.java:73)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-891) Flake in Spark metrics library?

2016-11-14 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663281#comment-15663281
 ] 

Stas Levin commented on BEAM-891:
-

Sounds good. 
Btw, why can't we just set {{options.setEnableSparkSinks(false);}} directly in 
{{TestOptionsForStreaming}}?

> Flake in Spark metrics library?
> ---
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>   at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>   at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>   at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>   at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>   at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>   at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>   at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>   at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>   at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>   at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>   at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>   at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.(SparkRuntimeContext.java:66)
>   at 
> org.apache.beam.runners.spark.translation.EvaluationContext.(EvaluationContext.java:73)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-979) ConcurrentModificationException exception after hours of running

2016-11-15 Thread Stas Levin (JIRA)
Stas Levin created BEAM-979:
---

 Summary: ConcurrentModificationException exception after hours of 
running
 Key: BEAM-979
 URL: https://issues.apache.org/jira/browse/BEAM-979
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Stas Levin
Assignee: Stas Levin


{code}

User class threw exception: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: Lost 
task 1.3 in stage 4483.0 (TID 44548, .com): 
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at 
com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-979) ConcurrentModificationException exception after hours of running

2016-11-15 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666975#comment-15666975
 ] 

Stas Levin commented on BEAM-979:
-

I think it may have to do with {StateSpecFunctions#mapSourceFunction}}, where 
we have this:

{code:java}

// read microbatch.
final List> readValues = new ArrayList<>();

// ...

while (!finished) {
readValues.add(WindowedValue.of(reader.getCurrent(), 
reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
finished = !reader.advance();
  }

// ...

return Iterators.unmodifiableIterator(readValues.iterator());

{code}

I think there may be a scenario where {{readValues}} may be changed and read 
concurrently, which causes a {{ConcurrentModificationException}} to be thrown 
upon invoking the {{iterator#next}} we see in the stacktrace. 

One of the workarounds I'm trying is to change {{final List> 
readValues = new ArrayList<>();}} to {{final Collection> 
readValues = new ConcurrentLinkedQueue<>();}}. Ever since making this change 
the exception has not reoccured, but given its nature it's too soon to tell for 
sure.

Moreover, this change may prevent from a {{ConcurrentModificationException}} 
being thrown, but we still need to consider if it being thrown is merely a 
symptom of some scenario being improperly handled rather than a problem per-se.

> ConcurrentModificationException exception after hours of running
> 
>
> Key: BEAM-979
> URL: https://issues.apache.org/jira/browse/BEAM-979
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {code}
>   
> User class threw exception: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: 
> Lost task 1.3 in stage 4483.0 (TID 44548, .com): 
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at 
> com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-979) ConcurrentModificationException exception after hours of running

2016-11-15 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666975#comment-15666975
 ] 

Stas Levin edited comment on BEAM-979 at 11/15/16 12:24 PM:


I think it may have to do with {{StateSpecFunctions#mapSourceFunction}}, where 
we have this:

{code:java}

// read microbatch.
final List> readValues = new ArrayList<>();

// ...

while (!finished) {
readValues.add(WindowedValue.of(reader.getCurrent(), 
reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
finished = !reader.advance();
  }

// ...

return Iterators.unmodifiableIterator(readValues.iterator());

{code}

I think there may be a scenario where {{readValues}} may be changed and read 
concurrently, which causes a {{ConcurrentModificationException}} to be thrown 
upon invoking the {{iterator#next}} we see in the stacktrace. 

One of the workarounds I'm trying is to change {{final List> 
readValues = new ArrayList<>();}} to {{final Collection> 
readValues = new ConcurrentLinkedQueue<>();}}. Ever since making this change 
the exception has not reoccured, but given its nature it's too soon to tell for 
sure.

Moreover, this change may prevent from a {{ConcurrentModificationException}} 
being thrown, but we still need to consider if it being thrown is merely a 
symptom of some scenario being improperly handled rather than a problem per-se.


was (Author: staslev):
I think it may have to do with {StateSpecFunctions#mapSourceFunction}}, where 
we have this:

{code:java}

// read microbatch.
final List> readValues = new ArrayList<>();

// ...

while (!finished) {
readValues.add(WindowedValue.of(reader.getCurrent(), 
reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
finished = !reader.advance();
  }

// ...

return Iterators.unmodifiableIterator(readValues.iterator());

{code}

I think there may be a scenario where {{readValues}} may be changed and read 
concurrently, which causes a {{ConcurrentModificationException}} to be thrown 
upon invoking the {{iterator#next}} we see in the stacktrace. 

One of the workarounds I'm trying is to change {{final List> 
readValues = new ArrayList<>();}} to {{final Collection> 
readValues = new ConcurrentLinkedQueue<>();}}. Ever since making this change 
the exception has not reoccured, but given its nature it's too soon to tell for 
sure.

Moreover, this change may prevent from a {{ConcurrentModificationException}} 
being thrown, but we still need to consider if it being thrown is merely a 
symptom of some scenario being improperly handled rather than a problem per-se.

> ConcurrentModificationException exception after hours of running
> 
>
> Key: BEAM-979
> URL: https://issues.apache.org/jira/browse/BEAM-979
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {code}
>   
> User class threw exception: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: 
> Lost task 1.3 in stage 4483.0 (TID 44548, .com): 
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at 
> com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCh

[jira] [Commented] (BEAM-979) ConcurrentModificationException exception after hours of running

2016-11-15 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667018#comment-15667018
 ] 

Stas Levin commented on BEAM-979:
-

I believe 
{{scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)}} 
implies that it's a Beam issue rather than a Spark issue, since we're seeing a 
Java-Scala interop.

I think the only place {{spark-runner}} actually creates and {{RDD}} is when 
reading from a source, in our case {{SparkUnboundedSource}}. After hitting 
"Cmd+B" enough times I found the following block to be of particular interest:

{code:java}
Iterator>> mapWithStateDStream = 
inputDStream.mapWithState(StateSpec.function(StateSpecFunctions.mapSourceFunction(rc)));
{code}

Following the stacktrace plus realising that the {{iterator}} being modified is 
actually a {{java.util.ArrayList}} also leads me to 
{{StateSpecFunctions#mapSourceFunction}} which returns an {{iterator}} which is 
in fact an {{ArrayList}} just like the stacktrace says. 


> ConcurrentModificationException exception after hours of running
> 
>
> Key: BEAM-979
> URL: https://issues.apache.org/jira/browse/BEAM-979
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {code}
>   
> User class threw exception: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: 
> Lost task 1.3 in stage 4483.0 (TID 44548, .com): 
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at 
> com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-979) ConcurrentModificationException exception after hours of running

2016-11-15 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667018#comment-15667018
 ] 

Stas Levin edited comment on BEAM-979 at 11/15/16 12:38 PM:


I believe 
{{scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)}} 
implies that it's a Beam issue rather than a Spark issue, since we're seeing a 
Java-Scala interop.

I think the only place {{spark-runner}} actually creates an {{RDD}} is when 
reading from a source, in our case {{SparkUnboundedSource}}. After hitting 
"Cmd+B" enough times I found the following block to be of particular interest:

{code:java}
Iterator>> mapWithStateDStream = 
inputDStream.mapWithState(StateSpec.function(StateSpecFunctions.mapSourceFunction(rc)));
{code}

Following the stacktrace plus realising that the {{iterator}} being modified is 
actually a {{java.util.ArrayList}} also leads me to 
{{StateSpecFunctions#mapSourceFunction}} which returns an {{iterator}} which is 
in fact an {{ArrayList}} just like the stacktrace says. 



was (Author: staslev):
I believe 
{{scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)}} 
implies that it's a Beam issue rather than a Spark issue, since we're seeing a 
Java-Scala interop.

I think the only place {{spark-runner}} actually creates and {{RDD}} is when 
reading from a source, in our case {{SparkUnboundedSource}}. After hitting 
"Cmd+B" enough times I found the following block to be of particular interest:

{code:java}
Iterator>> mapWithStateDStream = 
inputDStream.mapWithState(StateSpec.function(StateSpecFunctions.mapSourceFunction(rc)));
{code}

Following the stacktrace plus realising that the {{iterator}} being modified is 
actually a {{java.util.ArrayList}} also leads me to 
{{StateSpecFunctions#mapSourceFunction}} which returns an {{iterator}} which is 
in fact an {{ArrayList}} just like the stacktrace says. 


> ConcurrentModificationException exception after hours of running
> 
>
> Key: BEAM-979
> URL: https://issues.apache.org/jira/browse/BEAM-979
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {code}
>   
> User class threw exception: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: 
> Lost task 1.3 in stage 4483.0 (TID 44548, .com): 
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at 
> com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-579) Integrate NamedAggregators into Spark's sink system

2016-08-23 Thread Stas Levin (JIRA)
Stas Levin created BEAM-579:
---

 Summary: Integrate NamedAggregators into Spark's sink system
 Key: BEAM-579
 URL: https://issues.apache.org/jira/browse/BEAM-579
 Project: Beam
  Issue Type: Task
  Components: runner-spark
Reporter: Stas Levin
Assignee: Amit Sela
Priority: Critical


At the moment {{NamedAggregators}} is an adapter between Beam's {{Aggregator}} 
and Spark's {{Accumulator}} and is implemented as a single Spark 
{{Accumulator}}, holding a map of metrics that can be augmented with new 
metrics dynamically, after the the pipeline has already started. 

Spark's out-of-the-box metrics mechanism does not support adding metrics to 
{{Source}} s that have already registered (it pulls their metrics upon 
registration and never updates them again).

In light of the above, it would seem that there is a gap to bridge between the 
dynamic nature of {{NamedAggregators}} and Spark's current metric system so 
that metrics that are added dynamically are also reported to the defined Spark 
{{Sink}} s.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-579) Integrate NamedAggregators into Spark's sink system

2016-08-23 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15432450#comment-15432450
 ] 

Stas Levin commented on BEAM-579:
-

I've started working on it and should have a patch pretty soon.

> Integrate NamedAggregators into Spark's sink system
> ---
>
> Key: BEAM-579
> URL: https://issues.apache.org/jira/browse/BEAM-579
> Project: Beam
>  Issue Type: Task
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>Priority: Critical
>
> At the moment {{NamedAggregators}} is an adapter between Beam's 
> {{Aggregator}} and Spark's {{Accumulator}} and is implemented as a single 
> Spark {{Accumulator}}, holding a map of metrics that can be augmented with 
> new metrics dynamically, after the the pipeline has already started. 
> Spark's out-of-the-box metrics mechanism does not support adding metrics to 
> {{Source}} s that have already registered (it pulls their metrics upon 
> registration and never updates them again).
> In light of the above, it would seem that there is a gap to bridge between 
> the dynamic nature of {{NamedAggregators}} and Spark's current metric system 
> so that metrics that are added dynamically are also reported to the defined 
> Spark {{Sink}} s.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-579) Integrate NamedAggregators into Spark's sink system

2016-08-23 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-579:

Priority: Major  (was: Critical)

> Integrate NamedAggregators into Spark's sink system
> ---
>
> Key: BEAM-579
> URL: https://issues.apache.org/jira/browse/BEAM-579
> Project: Beam
>  Issue Type: Task
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>
> At the moment {{NamedAggregators}} is an adapter between Beam's 
> {{Aggregator}} and Spark's {{Accumulator}} and is implemented as a single 
> Spark {{Accumulator}}, holding a map of metrics that can be augmented with 
> new metrics dynamically, after the the pipeline has already started. 
> Spark's out-of-the-box metrics mechanism does not support adding metrics to 
> {{Source}} s that have already registered (it pulls their metrics upon 
> registration and never updates them again).
> In light of the above, it would seem that there is a gap to bridge between 
> the dynamic nature of {{NamedAggregators}} and Spark's current metric system 
> so that metrics that are added dynamically are also reported to the defined 
> Spark {{Sink}} s.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-613) SimpleStreamingWordCountTest tests only a single batch

2016-09-01 Thread Stas Levin (JIRA)
Stas Levin created BEAM-613:
---

 Summary: SimpleStreamingWordCountTest tests only a single batch
 Key: BEAM-613
 URL: https://issues.apache.org/jira/browse/BEAM-613
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Stas Levin
Assignee: Amit Sela


{{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
 aims to test a simple Spark streaming job, but only tests a single batch, 
which is uncharacteristic of an actual (even simple) streaming job usually 
consisting of multiple batches.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-613) SimpleStreamingWordCountTest tests only a single batch

2016-09-01 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-613:

Description: 
{{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
 aims to test a simple Spark streaming job, but only tests a single batch, 
which is uncharacteristic of an actual (even simple) streaming job, usually 
consisting of multiple batches.  (was: 
{{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
 aims to test a simple Spark streaming job, but only tests a single batch, 
which is uncharacteristic of an actual (even simple) streaming job usually 
consisting of multiple batches.)

> SimpleStreamingWordCountTest tests only a single batch
> --
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  aims to test a simple Spark streaming job, but only tests a single batch, 
> which is uncharacteristic of an actual (even simple) streaming job, usually 
> consisting of multiple batches.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-613) SimpleStreamingWordCountTest tests only a single batch

2016-09-01 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455424#comment-15455424
 ] 

Stas Levin edited comment on BEAM-613 at 9/1/16 1:42 PM:
-

This improvement will help detecting issues such as BEAM-15, where the 
StorageLevel inconsistency between DStream and RDD comes into play only when 
there are multiple batches.


was (Author: staslev):
This improvement will helps detecting issues such as BEAM-15, where the 
StorageLevel inconsistency between DStream and RDD comes into play only when 
there are multiple batches.

> SimpleStreamingWordCountTest tests only a single batch
> --
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  aims to test a simple Spark streaming job, but only tests a single batch, 
> which is uncharacteristic of an actual (even simple) streaming job, usually 
> consisting of multiple batches.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-613) SimpleStreamingWordCountTest tests only a single batch

2016-09-01 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455424#comment-15455424
 ] 

Stas Levin commented on BEAM-613:
-

This improvement will helps detecting issues such as BEAM-15, where the 
StorageLevel inconsistency between DStream and RDD comes into play only when 
there are multiple batches.

> SimpleStreamingWordCountTest tests only a single batch
> --
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  aims to test a simple Spark streaming job, but only tests a single batch, 
> which is uncharacteristic of an actual (even simple) streaming job, usually 
> consisting of multiple batches.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-628) Sink class org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be instantiated

2016-09-12 Thread Stas Levin (JIRA)
Stas Levin created BEAM-628:
---

 Summary: Sink class 
org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be 
instantiated
 Key: BEAM-628
 URL: https://issues.apache.org/jira/browse/BEAM-628
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Stas Levin
Assignee: Stas Levin


When configuring a Spark sink of type 
{{org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink}}
using a configuration like so:
{{*.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink}},
 the application fails to run with the following exception:
{noformat}
metrics.MetricsSystem: Sink class 
org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be 
instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1672)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:151)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:253)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException: 
org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:190)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:186)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:186)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:100)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:384)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:217)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:186)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
... 4 more
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-628) Sink class org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be instantiated

2016-09-12 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-628:

Fix Version/s: 0.3.0-incubating

> Sink class 
> org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be 
> instantiated
> -
>
> Key: BEAM-628
> URL: https://issues.apache.org/jira/browse/BEAM-628
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
> Fix For: 0.3.0-incubating
>
>
> When configuring a Spark sink of type 
> {{org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink}}
> using a configuration like so:
> {{*.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink}},
>  the application fails to run with the following exception:
> {noformat}
> metrics.MetricsSystem: Sink class 
> org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be 
> instantiated
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1672)
> at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:151)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:253)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
> at 
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:190)
> at 
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:186)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:186)
> at 
> org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:100)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:384)
> at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:217)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:186)
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> ... 4 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (BEAM-628) Sink class org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be instantiated

2016-09-12 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin resolved BEAM-628.
-
Resolution: Fixed

> Sink class 
> org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be 
> instantiated
> -
>
> Key: BEAM-628
> URL: https://issues.apache.org/jira/browse/BEAM-628
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
> Fix For: 0.3.0-incubating
>
>
> When configuring a Spark sink of type 
> {{org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink}}
> using a configuration like so:
> {{*.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink}},
>  the application fails to run with the following exception:
> {noformat}
> metrics.MetricsSystem: Sink class 
> org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink cannot be 
> instantiated
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1672)
> at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:151)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:253)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
> at 
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:190)
> at 
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:186)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:186)
> at 
> org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:100)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:384)
> at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:217)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:186)
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> ... 4 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-613) SimpleStreamingWordCountTest tests only a single batch

2016-09-12 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15483768#comment-15483768
 ] 

Stas Levin commented on BEAM-613:
-

Pending [pull #909|https://github.com/apache/incubator-beam/pull/909].

> SimpleStreamingWordCountTest tests only a single batch
> --
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  aims to test a simple Spark streaming job, but only tests a single batch, 
> which is uncharacteristic of an actual (even simple) streaming job, usually 
> consisting of multiple batches.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-637) WindowedValue$ValueInGlobalWindow is not serializable when using JavaSerializer instead of Kryo

2016-09-19 Thread Stas Levin (JIRA)
Stas Levin created BEAM-637:
---

 Summary: WindowedValue$ValueInGlobalWindow is not serializable 
when using JavaSerializer instead of Kryo 
 Key: BEAM-637
 URL: https://issues.apache.org/jira/browse/BEAM-637
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Stas Levin
Assignee: Amit Sela


When using {{JavaSerializer}} instead of {{KryoSerializer}} by configuring 
{{JavaSparkContext}} with  {{"spark.serializer"}} set to 
{{JavaSerializer.class.getCanonicalName()}}, an exception is thrown:

{noformat}
object not serializable (class: 
org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
{noformat}

{noformat}
Serialization stack:
- object not serializable (class: 
org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
at 
org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
at 
org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
at 
org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:175)
at 
org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:172)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at 
org.apache.spark.streaming.scheduler.JobSchedu

[jira] [Updated] (BEAM-613) Add windwoding use cases to SimpleStreamingWordCountTest

2016-09-22 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-613:

Summary: Add windwoding use cases to SimpleStreamingWordCountTest  (was: 
SimpleStreamingWordCountTest tests only a single batch)

> Add windwoding use cases to SimpleStreamingWordCountTest
> 
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  aims to test a simple Spark streaming job, but only tests a single batch, 
> which is uncharacteristic of an actual (even simple) streaming job, usually 
> consisting of multiple batches.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-613) Add windwoding use cases to SimpleStreamingWordCountTest

2016-09-22 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-613:

Description: 
{{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
 should have tests for both {{FixedWindows}} and {{SlidingWindows}}.  (was: 
{{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
 aims to test a simple Spark streaming job, but only tests a single batch, 
which is uncharacteristic of an actual (even simple) streaming job, usually 
consisting of multiple batches.)

> Add windwoding use cases to SimpleStreamingWordCountTest
> 
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  should have tests for both {{FixedWindows}} and {{SlidingWindows}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-613) SimpleStreamingWordCountTest does not properly test fixed windows

2016-09-22 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-613:

Summary: SimpleStreamingWordCountTest does not properly test fixed windows  
(was: Add windwoding use cases to SimpleStreamingWordCountTest)

> SimpleStreamingWordCountTest does not properly test fixed windows
> -
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  should have tests for both {{FixedWindows}} and {{SlidingWindows}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-613) SimpleStreamingWordCountTest does not properly test fixed windows

2016-09-22 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-613:

Description: 
{{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
 does not properly test {{FixedWindows}}.  (was: 
{{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
 should have tests for both {{FixedWindows}} and {{SlidingWindows}}.)

> SimpleStreamingWordCountTest does not properly test fixed windows
> -
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  does not properly test {{FixedWindows}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-669) SimpleStreamingWordCountTest does not have a test for sliding windows

2016-09-22 Thread Stas Levin (JIRA)
Stas Levin created BEAM-669:
---

 Summary: SimpleStreamingWordCountTest does not have a test for 
sliding windows
 Key: BEAM-669
 URL: https://issues.apache.org/jira/browse/BEAM-669
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Stas Levin
Assignee: Amit Sela


{{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
 does not have a test for {{SlidingWindows}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-613) SimpleStreamingWordCountTest does not properly test fixed windows

2016-09-22 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513318#comment-15513318
 ] 

Stas Levin commented on BEAM-613:
-

I'll open a separate issue for adding a test for sliding windows.

> SimpleStreamingWordCountTest does not properly test fixed windows
> -
>
> Key: BEAM-613
> URL: https://issues.apache.org/jira/browse/BEAM-613
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {{org.apache.beam.runners.spark.translation.streaming.SimpleStreamingWordCountTest}}
>  does not properly test {{FixedWindows}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-637) WindowedValue$ValueInGlobalWindow is not serializable when using JavaSerializer instead of Kryo

2016-09-23 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516207#comment-15516207
 ] 

Stas Levin commented on BEAM-637:
-

[~amitsela] I agree, we can make it about supporting Java serialisation in 
general. I'll also change the priority to "minor" as I don't believe this to be 
a pressing matter.

> WindowedValue$ValueInGlobalWindow is not serializable when using 
> JavaSerializer instead of Kryo 
> 
>
> Key: BEAM-637
> URL: https://issues.apache.org/jira/browse/BEAM-637
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>
> When using {{JavaSerializer}} instead of {{KryoSerializer}} by configuring 
> {{JavaSparkContext}} with  {{"spark.serializer"}} set to 
> {{JavaSerializer.class.getCanonicalName()}}, an exception is thrown:
> {noformat}
> object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
> ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
> {noformat}
> {noformat}
> Serialization stack:
>   - object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
> ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
>   at 
> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
>   at 
> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
>   at 
> org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:175)
>   at 
> org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:172)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> 

[jira] [Updated] (BEAM-637) WindowedValue$ValueInGlobalWindow is not serializable when using JavaSerializer instead of Kryo

2016-09-23 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-637:

Priority: Minor  (was: Major)

> WindowedValue$ValueInGlobalWindow is not serializable when using 
> JavaSerializer instead of Kryo 
> 
>
> Key: BEAM-637
> URL: https://issues.apache.org/jira/browse/BEAM-637
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>Priority: Minor
>
> When using {{JavaSerializer}} instead of {{KryoSerializer}} by configuring 
> {{JavaSparkContext}} with  {{"spark.serializer"}} set to 
> {{JavaSerializer.class.getCanonicalName()}}, an exception is thrown:
> {noformat}
> object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
> ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
> {noformat}
> {noformat}
> Serialization stack:
>   - object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
> ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
>   at 
> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
>   at 
> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
>   at 
> org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:175)
>   at 
> org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:172)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at scala.util.Try$.apply(Try.scala:161)
>   at org.apac

[jira] [Updated] (BEAM-637) Support Java serialisation (via JavaSerialiser)

2016-09-23 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-637:

Summary: Support Java serialisation (via JavaSerialiser)  (was: 
WindowedValue$ValueInGlobalWindow is not serializable when using JavaSerializer 
instead of Kryo )

> Support Java serialisation (via JavaSerialiser)
> ---
>
> Key: BEAM-637
> URL: https://issues.apache.org/jira/browse/BEAM-637
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>Priority: Minor
>
> When using {{JavaSerializer}} instead of {{KryoSerializer}} by configuring 
> {{JavaSparkContext}} with  {{"spark.serializer"}} set to 
> {{JavaSerializer.class.getCanonicalName()}}, an exception is thrown:
> {noformat}
> object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
> ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
> {noformat}
> {noformat}
> Serialization stack:
>   - object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
> ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
>   at 
> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
>   at 
> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
>   at 
> org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:175)
>   at 
> org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:172)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at scala.util.Try$.apply(T

[jira] [Updated] (BEAM-637) Support Java serialisation (via JavaSerialiser)

2016-09-23 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-637:

Issue Type: Improvement  (was: Bug)

> Support Java serialisation (via JavaSerialiser)
> ---
>
> Key: BEAM-637
> URL: https://issues.apache.org/jira/browse/BEAM-637
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
>Priority: Minor
>
> When using {{JavaSerializer}} instead of {{KryoSerializer}} by configuring 
> {{JavaSparkContext}} with  {{"spark.serializer"}} set to 
> {{JavaSerializer.class.getCanonicalName()}}, an exception is thrown:
> {noformat}
> object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
> ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
> {noformat}
> {noformat}
> Serialization stack:
>   - object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: 
> ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
>   at 
> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
>   at 
> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
>   at 
> org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:175)
>   at 
> org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:172)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at scala.util.Try$.apply(Try.scala:161)
>   at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>   at 
> org.apache.spark.strea

[jira] [Commented] (BEAM-983) runners/spark/translation/streaming/utils/TestPipelineOptions.java missing Apache license header.

2016-11-15 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668207#comment-15668207
 ] 

Stas Levin commented on BEAM-983:
-

[~jasonkuster] [~amitsela] Sorry about that, can these tests can be added to 
the set of tests that run against the GitHub PR?

> runners/spark/translation/streaming/utils/TestPipelineOptions.java missing 
> Apache license header.
> -
>
> Key: BEAM-983
> URL: https://issues.apache.org/jira/browse/BEAM-983
> Project: Beam
>  Issue Type: Bug
>Reporter: Jason Kuster
>Assignee: Amit Sela
>
> Pull request #1332 (https://github.com/apache/incubator-beam/pull/1332) is 
> failing in beam_PostCommit_MavenVerify with an Apache Rat failure -- 
> https://builds.apache.org/job/beam_PostCommit_MavenVerify/1827/ -- Rat output 
> points to 
> runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
>  as the culprit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1039) Spark context is never actually re-used in tests

2016-11-22 Thread Stas Levin (JIRA)
Stas Levin created BEAM-1039:


 Summary: Spark context is never actually re-used in tests
 Key: BEAM-1039
 URL: https://issues.apache.org/jira/browse/BEAM-1039
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Stas Levin
Assignee: Amit Sela


I think that due to a slight typo 
{{Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)}} got mixed up with
{{Boolean.getBoolean(System.getProperty(TEST_REUSE_SPARK_CONTEXT))}} in the 
following 2 places, which may have caused {{SparkContext}} to behave not quite 
as expected.

*  
https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L52
*  
https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L70

I'll take care of it as part of something I'm working on, so a PR should follow 
shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (BEAM-1039) Spark context is never actually re-used in tests

2016-11-22 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin reassigned BEAM-1039:


Assignee: Stas Levin  (was: Amit Sela)

> Spark context is never actually re-used in tests
> 
>
> Key: BEAM-1039
> URL: https://issues.apache.org/jira/browse/BEAM-1039
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> I think that due to a slight typo 
> {{Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)}} got mixed up with
> {{Boolean.getBoolean(System.getProperty(TEST_REUSE_SPARK_CONTEXT))}} in the 
> following 2 places, which may have caused {{SparkContext}} to behave not 
> quite as expected.
> *  
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L52
> *  
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L70
> I'll take care of it as part of something I'm working on, so a PR should 
> follow shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1039) Spark context is never actually re-used in tests

2016-11-22 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687906#comment-15687906
 ] 

Stas Levin commented on BEAM-1039:
--

For some reason it looked like...

Must have been a brain f... My bad, closing.

> Spark context is never actually re-used in tests
> 
>
> Key: BEAM-1039
> URL: https://issues.apache.org/jira/browse/BEAM-1039
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> I think that due to a slight typo 
> {{Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)}} got mixed up with
> {{Boolean.getBoolean(System.getProperty(TEST_REUSE_SPARK_CONTEXT))}} in the 
> following 2 places, which may have caused {{SparkContext}} to behave not 
> quite as expected.
> *  
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L52
> *  
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L70
> I'll take care of it as part of something I'm working on, so a PR should 
> follow shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (BEAM-1039) Spark context is never actually re-used in tests

2016-11-22 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin closed BEAM-1039.

   Resolution: Not A Problem
Fix Version/s: Not applicable

> Spark context is never actually re-used in tests
> 
>
> Key: BEAM-1039
> URL: https://issues.apache.org/jira/browse/BEAM-1039
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
> Fix For: Not applicable
>
>
> I think that due to a slight typo 
> {{Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)}} got mixed up with
> {{Boolean.getBoolean(System.getProperty(TEST_REUSE_SPARK_CONTEXT))}} in the 
> following 2 places, which may have caused {{SparkContext}} to behave not 
> quite as expected.
> *  
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L52
> *  
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L70
> I'll take care of it as part of something I'm working on, so a PR should 
> follow shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1050) PipelineResult.State is not set to FAILED in when a streaming job fails

2016-11-27 Thread Stas Levin (JIRA)
Stas Levin created BEAM-1050:


 Summary: PipelineResult.State is not set to FAILED in when a 
streaming job fails
 Key: BEAM-1050
 URL: https://issues.apache.org/jira/browse/BEAM-1050
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Stas Levin
Assignee: Stas Levin
Priority: Minor


In case of failure, {{SteamingContext#awaitTerminationOrTimeout}} and 
{{SteamingContext#awaitTermination}} will both throw an exception, and so 
{{state = State.DONE}} will not be executed in the code block below. 

In addition, it would probably make sense to set {{state = State.FAILED}} in 
cases where an exception takes place.

{code:java}
if (isStreamingPipeline()) {
  // stop streaming context
  if (timeout > 0) {
jssc.awaitTerminationOrTimeout(timeout);
  } else {
jssc.awaitTermination();
  }
  // stop streaming context gracefully, so checkpointing (and other 
computations) get to
  // finish before shutdown.
  jssc.stop(false, gracefully);
}
state = State.DONE;
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1050) PipelineResult.State is not set to FAILED in when a streaming job fails

2016-11-27 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15699605#comment-15699605
 ] 

Stas Levin commented on BEAM-1050:
--

I see that the code has changed since (the relevant snipped is below), but I 
believe the argument above still holds.

{code:java}
if (isStreamingPipeline()) {
  // According to PipelineResult: Provide a value less than 1 ms for an 
infinite wait
  if (duration.getMillis() < 1L) {
jssc.awaitTermination();
state = State.DONE;
  } else {
jssc.awaitTermination(duration.getMillis());
// According to PipelineResult: The final state of the pipeline or null 
on timeout
if (jssc.getState().equals(StreamingContextState.STOPPED)) {
  state = State.DONE;
} else {
  return null;
}
  }
  return state;
} else {
  // This is no-op, since Spark runner in batch is blocking.
  // It needs to be updated once SparkRunner supports non-blocking 
execution:
  // https://issues.apache.org/jira/browse/BEAM-595
  return State.DONE;
}
{code}

> PipelineResult.State is not set to FAILED in when a streaming job fails
> ---
>
> Key: BEAM-1050
> URL: https://issues.apache.org/jira/browse/BEAM-1050
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Stas Levin
>Priority: Minor
>
> In case of failure, {{SteamingContext#awaitTerminationOrTimeout}} and 
> {{SteamingContext#awaitTermination}} will both throw an exception, and so 
> {{state = State.DONE}} will not be executed in the code block below. 
> In addition, it would probably make sense to set {{state = State.FAILED}} in 
> cases where an exception takes place.
> {code:java}
> if (isStreamingPipeline()) {
>   // stop streaming context
>   if (timeout > 0) {
> jssc.awaitTerminationOrTimeout(timeout);
>   } else {
> jssc.awaitTermination();
>   }
>   // stop streaming context gracefully, so checkpointing (and other 
> computations) get to
>   // finish before shutdown.
>   jssc.stop(false, gracefully);
> }
> state = State.DONE;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1067) apex.examples.WordCountTest.testWordCountExample may be flaky

2016-12-01 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-1067:
-
Description: 
Seems that 
{{org.apache.beam.runners.apex.examples.WordCountTest.testWordCountExample}} is 
flaky.

For example, 
[this|https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5408/org.apache.beam$beam-runners-apex/testReport/org.apache.beam.runners.apex.examples/WordCountTest/testWordCountExample/
 ] run failed although no changes were made in {{runner-apex}}.

  was:
Seems that 
{{org.apache.beam.runners.apex.examples.WordCountTest.testWordCountExample}} is 
flaky.

For example, 
[this|https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5408/org.apache.beam$beam-runners-apex/testReport/org.apache.beam.runners.apex.examples/WordCountTest/testWordCountExample/
 ] run failed although no changes were made in {{runners-spark}}.


> apex.examples.WordCountTest.testWordCountExample may be flaky
> -
>
> Key: BEAM-1067
> URL: https://issues.apache.org/jira/browse/BEAM-1067
> Project: Beam
>  Issue Type: Bug
>  Components: runner-apex
>Reporter: Stas Levin
>
> Seems that 
> {{org.apache.beam.runners.apex.examples.WordCountTest.testWordCountExample}} 
> is flaky.
> For example, 
> [this|https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5408/org.apache.beam$beam-runners-apex/testReport/org.apache.beam.runners.apex.examples/WordCountTest/testWordCountExample/
>  ] run failed although no changes were made in {{runner-apex}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1067) apex.examples.WordCountTest.testWordCountExample may be flaky

2016-12-01 Thread Stas Levin (JIRA)
Stas Levin created BEAM-1067:


 Summary: apex.examples.WordCountTest.testWordCountExample may be 
flaky
 Key: BEAM-1067
 URL: https://issues.apache.org/jira/browse/BEAM-1067
 Project: Beam
  Issue Type: Bug
  Components: runner-apex
Reporter: Stas Levin


Seems that 
{{org.apache.beam.runners.apex.examples.WordCountTest.testWordCountExample}} is 
flaky.

For example, 
[this|https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5408/org.apache.beam$beam-runners-apex/testReport/org.apache.beam.runners.apex.examples/WordCountTest/testWordCountExample/
 ] run failed although no changes were made in {{runners-spark}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1067) apex.examples.WordCountTest.testWordCountExample may be flaky

2016-12-02 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716750#comment-15716750
 ] 

Stas Levin commented on BEAM-1067:
--

Not sure, but 
{{org.apache.beam.runners.apex.translation.ParDoBoundTranslatorTest.testMultiOutputParDoWithSideInputs}}
 seems to fail [here and there as 
well|https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5451/org.apache.beam$beam-runners-apex/testReport/org.apache.beam.runners.apex.translation/ParDoBoundTranslatorTest/testMultiOutputParDoWithSideInputs/].

> apex.examples.WordCountTest.testWordCountExample may be flaky
> -
>
> Key: BEAM-1067
> URL: https://issues.apache.org/jira/browse/BEAM-1067
> Project: Beam
>  Issue Type: Bug
>  Components: runner-apex
>Reporter: Stas Levin
>Assignee: Thomas Weise
>
> Seems that 
> {{org.apache.beam.runners.apex.examples.WordCountTest.testWordCountExample}} 
> is flaky.
> For example, 
> [this|https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5408/org.apache.beam$beam-runners-apex/testReport/org.apache.beam.runners.apex.examples/WordCountTest/testWordCountExample/
>  ] run failed although no changes were made in {{runner-apex}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (BEAM-298) Make TestPipeline implement the TestRule interface

2016-12-14 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin reassigned BEAM-298:
---

Assignee: Stas Levin

> Make TestPipeline implement the TestRule interface
> --
>
> Key: BEAM-298
> URL: https://issues.apache.org/jira/browse/BEAM-298
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Thomas Groh
>Assignee: Stas Levin
>Priority: Minor
>
> https://github.com/junit-team/junit4/wiki/Rules
> A JUnit Rule allows a test to use a field annotated with @Rule to wrap 
> executing tests. Doing so allows the TestPipeline to, at the time the test 
> completes, assert that all applied transforms have been executed. This 
> ensures that all unit tests that utilize a TestPipeline rule either are 
> declared to explicitly not expect to execute or have executed the pipeline.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (BEAM-85) PAssert needs sanity check that it's used correctly

2016-12-14 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-85?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin reassigned BEAM-85:
--

Assignee: Stas Levin

> PAssert needs sanity check that it's used correctly
> ---
>
> Key: BEAM-85
> URL: https://issues.apache.org/jira/browse/BEAM-85
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Daniel Halperin
>Assignee: Stas Levin
>
> We should validate two things:
> # DataflowAssert is not added to a pipeline that has already been run.
> # The pipeline is run after the DataflowAssert is added.
> If either of these are not validated, then it is possible that the test 
> doesn't actually verify anything.
> This code should throw an assertion error or fail in some other way.
> {code}
> Pipeline p = TestPipeline.create();
> PCollection value = p.apply(Create.of(Boolean.FALSE));
> p.run();
> DataflowAssert.thatSingleton(value).isEqualTo(true);
> {code}
> but it would pass silently.
> similarly, this code wills pass silently:
> {code}
> Pipeline p = TestPipeline.create();
> PCollection value = p.apply(Create.of(Boolean.FALSE));
> DataflowAssert.thatSingleton(value).isEqualTo(true);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-690) Backoff in the DirectRunner Monitor if no work is Available

2016-12-15 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15751670#comment-15751670
 ] 

Stas Levin commented on BEAM-690:
-

Hi,

I'm trying to look into it, correct me if I'm wrong, but won't having such a 
back-off result in tests taking longer to complete? 

Backing-off the scheduling of {{MonitorRunnable#run}} delays the detection of a 
shut down performed in {{MonitorRunnable#shouldShutdown}}, called by 
{{MonitorRunnable#run}}.

> Backoff in the DirectRunner Monitor if no work is Available
> ---
>
> Key: BEAM-690
> URL: https://issues.apache.org/jira/browse/BEAM-690
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Thomas Groh
>
> When a Pipeline has no elements available to process, the Monitor Runnable 
> will be repeatedly scheduled. Given that there is no work to be done, this 
> will loop over the steps in the transform looking for timers, and prompt the 
> sources to perform additional work, even though there is no work to be done. 
> This consumes the entirety of a single core.
> Add a bounded backoff to rescheduling the monitor runnable if no work has 
> been done since it last ran. This will reduce resource consumption on 
> low-throughput Pipelines.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-1176) Make our test suites use @Rule TestPipeline

2016-12-20 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15763964#comment-15763964
 ] 

Stas Levin edited comment on BEAM-1176 at 12/20/16 11:18 AM:
-

I've started migrating the tests to using the {{TestPipeline}} as a JUnit rule. 
There're quite a few of places to change so this might take a while longer.

>From the migration I've done so far, there's a particularly notable pattern we 
>don't currently support in the {{TestPipeline}} rule - having multiple 
>pipeline instances within a single test method. 
This goes beyond supporting multiple {{run}} invocations on the same 
{{TestPipeline}} rule instance, since such tests make assumption about their 
internal pipeline instances being isolated. This in turn translates to their 
{{PTransform}} names often being unstable, and/or dangling pipeline nodes which 
are acceptable since error handling is performed inside dedicated try/catch 
blocks inside the test itself.

Examples for such cases can be found the following tests:
*  {{AvroIOGeneratedClassTest#runTestRead}}
*  
{{ApproximateUniqueTest#runApproximateUniqueWithDuplicates}},{{ApproximateUniqueTest#runApproximateUniqueWithSkewedDistributions}}
*  {{SampleTest#runPickAnyTest}}
*  {{BigtableIOTest#runReadTest}}

For now I'm refraining from migrating these particular test cases by preserving 
direct calls to {{TestPipeline.create()}} in places other than the {{@Rule}} 
declaration itself, which ideally I guess we should avoid.

Any thoughts?


was (Author: staslev):
I've started migrating the tests to using the {{TestPipeline}} as a JUnit rule. 
There's quite a few of places to change so this might take a while longer.

>From the migration I've done so far, there's a particularly notable pattern we 
>don't currently support in the {{TestPipeline}} rule - having multiple 
>pipeline instances within a single test method. 
This goes beyond supporting multiple {{run}} invocations on the same 
{{TestPipeline}} rule instance, since such tests make assumption about their 
internal pipeline instances being isolated. This in turn translates to their 
{{PTransform}} names often being unstable, and/or dangling pipeline nodes which 
are acceptable since error handling is performed inside dedicated try/catch 
blocks inside the test itself.

Examples for such cases can be found the following tests:
*  {{AvroIOGeneratedClassTest#runTestRead}}
*  
{{ApproximateUniqueTest#runApproximateUniqueWithDuplicates}},{{ApproximateUniqueTest#runApproximateUniqueWithSkewedDistributions}}
*  {{SampleTest#runPickAnyTest}}
*  {{BigtableIOTest#runReadTest}}

For now I'm refraining from migrating these particular test cases by preserving 
direct calls to {{TestPipeline.create()}} in places other than the {{@Rule}} 
declaration itself, which ideally I guess we should avoid.

Any thoughts?

> Make our test suites use @Rule TestPipeline
> ---
>
> Key: BEAM-1176
> URL: https://issues.apache.org/jira/browse/BEAM-1176
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Stas Levin
>Priority: Minor
>
> Now that [~staslev] has made {{TestPipeline}} a JUnit rule that performs 
> useful sanity checks, we should port all of our tests to it so that they set 
> a good example for users. Maybe we'll even catch some straggling tests with 
> errors :-)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1176) Make our test suites use @Rule TestPipeline

2016-12-20 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15763964#comment-15763964
 ] 

Stas Levin commented on BEAM-1176:
--

I've started migrating the tests to using the {{TestPipeline}} as a JUnit rule. 
There's quite a few of places to change so this might take a while longer.

>From the migration I've done so far, there's a particularly notable pattern we 
>don't currently support in the {{TestPipeline}} rule - having multiple 
>pipeline instances within a single test method. 
This goes beyond supporting multiple {{run}} invocations on the same 
{{TestPipeline}} rule instance, since such tests make assumption about their 
internal pipeline instances being isolated. This in turn translates to their 
{{PTransform}} names often being unstable, and/or dangling pipeline nodes which 
are acceptable since error handling is performed inside dedicated try/catch 
blocks inside the test itself.

Examples for such cases can be found the following tests:
*  {{AvroIOGeneratedClassTest#runTestRead}}
*  
{{ApproximateUniqueTest#runApproximateUniqueWithDuplicates}},{{ApproximateUniqueTest#runApproximateUniqueWithSkewedDistributions}}
*  {{SampleTest#runPickAnyTest}}
*  {{BigtableIOTest#runReadTest}}

For now I'm refraining from migrating these particular test cases by preserving 
direct calls to {{TestPipeline.create()}} in places other than the {{@Rule}} 
declaration itself, which ideally I guess we should avoid.

Any thoughts?

> Make our test suites use @Rule TestPipeline
> ---
>
> Key: BEAM-1176
> URL: https://issues.apache.org/jira/browse/BEAM-1176
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Stas Levin
>Priority: Minor
>
> Now that [~staslev] has made {{TestPipeline}} a JUnit rule that performs 
> useful sanity checks, we should port all of our tests to it so that they set 
> a good example for users. Maybe we'll even catch some straggling tests with 
> errors :-)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-1176) Make our test suites use @Rule TestPipeline

2016-12-20 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15763964#comment-15763964
 ] 

Stas Levin edited comment on BEAM-1176 at 12/20/16 11:20 AM:
-

I've started migrating the tests to using the {{TestPipeline}} as a JUnit rule. 
There're quite a few of places to change so this might take a while longer.

>From the migration I've done so far, there's a particularly notable pattern we 
>don't currently support in the {{TestPipeline}} rule - having multiple 
>pipeline instances within a single test method. 
This goes beyond supporting multiple {{run}} invocations on the same 
{{TestPipeline}} rule instance, since such tests make assumption about their 
internal pipeline instances being isolated. This in turn translates to their 
{{PTransform}} names often being unstable, and/or dangling pipeline nodes which 
are acceptable since error handling is performed inside dedicated try/catch 
blocks inside the test itself.

Examples for such cases can be found the following tests:
*  {{AvroIOGeneratedClassTest#runTestRead}}
*  
{{ApproximateUniqueTest#runApproximateUniqueWithDuplicates}},{{ApproximateUniqueTest#runApproximateUniqueWithSkewedDistributions}}
*  {{SampleTest#runPickAnyTest}}
*  {{BigtableIOTest#runReadTest}}

For now I'm refraining from migrating these particular test cases by preserving 
direct calls to {{TestPipeline.create()}} in places other than the {{@Rule}} 
declaration itself, which ideally I guess we should avoid.

Any thoughts?

P.S.
Status report: {{1}} straggling test was missing a {{pipeline.run}} :)


was (Author: staslev):
I've started migrating the tests to using the {{TestPipeline}} as a JUnit rule. 
There're quite a few of places to change so this might take a while longer.

>From the migration I've done so far, there's a particularly notable pattern we 
>don't currently support in the {{TestPipeline}} rule - having multiple 
>pipeline instances within a single test method. 
This goes beyond supporting multiple {{run}} invocations on the same 
{{TestPipeline}} rule instance, since such tests make assumption about their 
internal pipeline instances being isolated. This in turn translates to their 
{{PTransform}} names often being unstable, and/or dangling pipeline nodes which 
are acceptable since error handling is performed inside dedicated try/catch 
blocks inside the test itself.

Examples for such cases can be found the following tests:
*  {{AvroIOGeneratedClassTest#runTestRead}}
*  
{{ApproximateUniqueTest#runApproximateUniqueWithDuplicates}},{{ApproximateUniqueTest#runApproximateUniqueWithSkewedDistributions}}
*  {{SampleTest#runPickAnyTest}}
*  {{BigtableIOTest#runReadTest}}

For now I'm refraining from migrating these particular test cases by preserving 
direct calls to {{TestPipeline.create()}} in places other than the {{@Rule}} 
declaration itself, which ideally I guess we should avoid.

Any thoughts?

> Make our test suites use @Rule TestPipeline
> ---
>
> Key: BEAM-1176
> URL: https://issues.apache.org/jira/browse/BEAM-1176
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Stas Levin
>Priority: Minor
>
> Now that [~staslev] has made {{TestPipeline}} a JUnit rule that performs 
> useful sanity checks, we should port all of our tests to it so that they set 
> a good example for users. Maybe we'll even catch some straggling tests with 
> errors :-)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1186) Migrate the remaining tests to use TestPipeline as a JUnit rule.

2016-12-20 Thread Stas Levin (JIRA)
Stas Levin created BEAM-1186:


 Summary: Migrate the remaining tests to use TestPipeline as a 
JUnit rule.
 Key: BEAM-1186
 URL: https://issues.apache.org/jira/browse/BEAM-1186
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Reporter: Stas Levin
Assignee: Stas Levin
Priority: Minor


Following up on [BEAM-1176|https://issues.apache.org/jira/browse/BEAM-1176], 
the following tests still have direct calls to {{TestPipeline.create()}}:
* {{AvroIOGeneratedClassTest#runTestRead}}
* {{ApproximateUniqueTest#runApproximateUniqueWithDuplicates}}
* {{ApproximateUniqueTest#runApproximateUniqueWithSkewedDistributions}}
* {{SampleTest#runPickAnyTest}}
* {{BigtableIOTest#runReadTest}}

Consider using [parametrised tests|https://github.com/Pragmatists/junitparams] 
as suggested by [~lcwik].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1176) Make our test suites use @Rule TestPipeline

2016-12-20 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15764919#comment-15764919
 ] 

Stas Levin commented on BEAM-1176:
--

Sounds good, enter [BEAM-1186|https://issues.apache.org/jira/browse/BEAM-1186].

> Make our test suites use @Rule TestPipeline
> ---
>
> Key: BEAM-1176
> URL: https://issues.apache.org/jira/browse/BEAM-1176
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Stas Levin
>Priority: Minor
>
> Now that [~staslev] has made {{TestPipeline}} a JUnit rule that performs 
> useful sanity checks, we should port all of our tests to it so that they set 
> a good example for users. Maybe we'll even catch some straggling tests with 
> errors :-)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)