[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15659356#comment-15659356
 ] 

Amit Sela commented on BEAM-891:
--------------------------------

[~staslev] I don't think {{enableSparkSinks}} property in 
{{SparkPipelineOptions}} needs to be {{true}} by default (and we should 
probably change the name as it implies Sinks as pipeline output).
We have the {{NamedAggregatorsTest}} test where it should explicitly set 
{{enableSparkSinks}} and this would provide a test for the metrics Sink.
This way, you don't have to register/remove sources constantly, the code should 
take into account that a context is not (mostly) reused.
As for provided context that may be reused, we need to address this but we can 
assume that that's for advanced users.
This should also make this proposed fix more reliable, no ? 
WDYT ?

> Flake in Spark metrics library?
> -------------------------------
>
>                 Key: BEAM-891
>                 URL: https://issues.apache.org/jira/browse/BEAM-891
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Daniel Halperin
>            Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>       at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>       at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>       at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>       at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>       at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>       at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>       at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>       at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>       at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>       at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>       at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>       at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>       at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>       at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>       at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>       at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>       at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>       at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>       at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>       at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>       at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.<init>(SparkRuntimeContext.java:66)
>       at 
> org.apache.beam.runners.spark.translation.EvaluationContext.<init>(EvaluationContext.java:73)
>       at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to