[ 
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650533#comment-15650533
 ] 

Stas Levin commented on BEAM-891:
---------------------------------

We can indeed try syncing the following block in {{SparkRuntimeContext}}:

{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}

But since other calls to {{MetricsSystem#removeSource}} and 
{{MetricsSystem#registerSource}} are not synced (e.g., from Spark itself), this 
kind of synchronisation will only help if the so called "race" lies in our Beam 
code. If the race is with Spark's internal calls, us syncing the above block 
will not be very helpful.

Another option would be to replace the above block with this one (which 
requires some minor changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).nonEmpty()) {
  metricsSystem.registerSource(aggregatorMetricSource);
}
{code}

Which eliminates {{SparkRuntimeContext}}'s call to 
{{MetricsSystem#removeSource}} altogether.

> Flake in Spark metrics library?
> -------------------------------
>
>                 Key: BEAM-891
>                 URL: https://issues.apache.org/jira/browse/BEAM-891
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Daniel Halperin
>            Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to 
> take a look? CC [~amitsela]
> Run: 
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
>       at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
>       at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
>       at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
>       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>       at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>       at 
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>       at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
>       at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
>       at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
>       at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>       at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>       at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
>       at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
>       at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
>       at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>       at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
>       at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
>       at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
>       at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
>       at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
>       at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
>       at 
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.<init>(SparkRuntimeContext.java:66)
>       at 
> org.apache.beam.runners.spark.translation.EvaluationContext.<init>(EvaluationContext.java:73)
>       at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to