OK, I just checked and it's part of the PR.

I'm checking out locally to check. I will let you know.

Regards
JB

On 02/19/2017 08:33 PM, Jean-Baptiste Onofré wrote:
Hi Dipti,

regarding Elasticsearch, it seems that the classloader doesn't contain
the EsInputSplit. Using spark-submit, you have to provide the
corresponding jar as package.

For Cassandra, it seems more something about coder/serialization.

Are the tests part of the PR (in order for me to take a look) ?

Regards
JB

On 02/19/2017 06:14 PM, Dipti Kulkarni wrote:
Hi folks,

I am currently working on running HadoopInputFormatIO's CassandraIT
and Elasticsearch IT on Spark and Dataflow runners. However I  am
facing issues in both ITs for specific Classes not being found at run
time only when I use the Dataflow runner profile or Spark runner
profile. I see issues during deserialization process.  Without the
profiles, the ITs work fine on the direct runner. Here are the
specific details:

Cassandra and Elastic IT execution using Spark pipeline options:

Cassandra IT execution using Spark runner:
Command used:
mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format
-DskipITs=false -DintegrationTestPipelineOptions='[ "--project=
sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ",
"--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=104.154.16.243",
"--serverPort=9160","--userName=beamuser" ,"--password=test123"]'
-Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOCassandraIT

Exception:
testHIFReadForCassandraQuery(org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCassandraIT)

org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalStateException: unread block data
        at
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:73)

        at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)

        at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)

        at
org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:115)

        at
org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:64)

        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
        at
org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCassandraIT.testHIFReadForCassandraQuery(HIFIOCassandraIT.java:157)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

        at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

        at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)

        at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)

        at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)

        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)

        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at
org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
        at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)

        at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)

        at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)

        at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)

        at
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:161)

        at
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290)

        at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242)

        at
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121)
Caused by: java.lang.IllegalStateException: unread block data
        at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2726)

        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)

        at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)

        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Elasticsearch IT execution using Spark runner:
Command used:
mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format
-DskipITs=false -DintegrationTestPipelineOptions='[ "--project=
sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ",
"--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=104.198.29.210",
"--serverPort=9200","--userName=beamuser" ,"--password=test123"]'
-Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOElasticIT

Exception:
Caused by: java.lang.ClassNotFoundException:
org.elasticsearch.hadoop.mr.EsInputFormat.EsInputSplit
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at
org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO$HadoopInputFormatBoundedSource$SerializableSplit.readExternal(HadoopInputFormatIO.java:973)

        at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2062)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2011)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)

        at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)

        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

I get same set of exceptions on Dataflow runner too.

Here are some things that I have tried to address this issue:
1) I  thought that this issue is likely to be due to some dependency
conflicts after adding beam-runners-spark and spark-streaming_2.10
dependencies in the pom.xml.
2) beam-runners-spark dependency includes dependencies like
avro-mapred, hadoop-mapreduce-client-core with hadoop version 2.2.0.
Hence, tried excluding these dependencies from beam-runners-spark
dependency and then executed the IT. But, that didn't help in resolving.
3) spark-streaming_2.10 dependency includes dependencies like
hadoop-client which i tried to exclude from the pom. By this , ensured
that no hadoop 2.2.0 dependency is added in pom.xml. We are using
hadoop 2.7.0 version in HadoopInputFormatIO.
4) Compared the dependency trees created before adding Spark runner
and after adding Spark runner. The only difference was related to the
hadoop client dependencies which i later tried to exclude but that
didn't help in resolving the issue.
5) I also tried to change the spark dependency scopes from runtime to
provided/compile.

Observations:

*         This issues doesn't seem to be due to some dependency
conflict in the pom.xml. Because, if i execute the ITs using
DirectRunner with same set of dependencies in the pom.xml,the test
proceeds without any exceptions.

*         The exceptions in execution of both the ITs seem to be due
to 'at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)'.
In case of Spark runner. This exception comes only when we specify
TestSparkRunner as the runner in integrationTestPipelineOptions. And
same nature of exception is observed in both Cassandra and
elasticsearch ITs.

*         With DirectRunner it is able to find ESInputSplit class in
case of Elasticsearch and the test runs successfully.

Below are some the queries for which I am trying to find  answers to:
1) For TestSparkRunner, do we need to specify classpath or something
similar to let the jars get loaded?
2) Do we need to specify sparkMaster in the pipeline options. We tried
using URLs. But it could not parse the URL.
3) Any additional input to be given to the options?

Since same issues appear on Dataflow runner, so issue may not be
runner specific. Any inputs greatly appreciated.

-Dipti


DISCLAIMER
==========
This e-mail may contain privileged and confidential information which
is the property of Persistent Systems Ltd. It is intended only for the
use of the individual or entity to which it is addressed. If you are
not the intended recipient, you are not authorized to read, retain,
copy, print, distribute or use this message. If you have received this
communication in error, please notify the sender and delete all copies
of this message. Persistent Systems Ltd. does not accept any liability
for virus infected mails.




--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to