Eugene Kirpichov created BEAM-4745:
--------------------------------------

             Summary: SDF tests broken by innocent change due to Dataflow 
worker dependencies
                 Key: BEAM-4745
                 URL: https://issues.apache.org/jira/browse/BEAM-4745
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
            Reporter: Eugene Kirpichov
            Assignee: Eugene Kirpichov


https://github.com/apache/beam/pull/5894 broke SDF in Dataflow streaming 
runner, using SDFs fails with the error below.

The reason is that Dataflow worker has a staged copy of some stuff including 
runners-core-construction, and it comes before user code in the classpath. So 
the pipeline includes a serialized SplittableParDo from master, but the worker 
deserializes it using a stale class file.

This needs to be fixed on Dataflow side. Filing this JIRA just to track the 
externally facing issue.

Meanwhile to stop the bleeding I'm going to revert the change, even though by 
itself it's a correct change, but it's better to have SDFs not invoke 
setup/teardown than to have them not work at all.

CC: [~iemejia]

java.lang.RuntimeException: 
com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:192)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
        
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
        
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
        
com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: 
com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
        
com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
        
com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
        
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
        
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
        
com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: unable to deserialize Serialized 
DoFnInfo
        
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
        
com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:61)
        
com.google.cloud.dataflow.worker.UserParDoFnFactory.lambda$create$0(UserParDoFnFactory.java:92)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
        
com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
        
com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
        
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
        
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
        
com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: 
org.apache.beam.runners.core.construction.SplittableParDo$PairWithRestrictionFn;
 local class incompatible: stream classdesc serialVersionUID = 
-2216501394657530686, local class serialVersionUID = -6277163835950193211
        java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
        java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
        java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
        
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
        
com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:61)
        
com.google.cloud.dataflow.worker.UserParDoFnFactory.lambda$create$0(UserParDoFnFactory.java:92)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
        
com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
        
com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
        
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
        
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
        
com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
        
com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to