Thanks very much JB, will look forward to your note. -----Original Message----- From: Jean-Baptiste Onofré [mailto:[email protected]] Sent: Friday, September 1, 2017 1:54 AM To: [email protected] Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
Sure, I will send the PR during the weekend. I will let you know. Regards JB On 08/31/2017 03:31 PM, Mahender Devaruppala wrote: > Thanks JB. Could you please point me to the location of Spark Runner > specific to Spark 2.x or is this something part any configurations? > > -----Original Message----- > From: Jean-Baptiste Onofré [mailto:[email protected]] > Sent: Thursday, August 31, 2017 12:11 AM > To: [email protected] > Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue > > Hi, > > I'm working on a Spark runner specific to Spark 2.x as the API changed. > > So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x. > > Regards > JB > > On 08/30/2017 11:45 PM, Mahender Devaruppala wrote: >> Hello, >> >> I am running into spark assertion error when running a apache >> pipeline and below are the details: >> >> Apache Beam version: 2.1.0 >> >> Spark version: 2.1.0 >> >> Caused by: java.lang.AssertionError: assertion failed: copyAndReset >> must return a zero value copy >> >> at scala.Predef$.assert(Predef.scala:179) >> >> at >> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala: >> 1 >> 62) >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown >> Source) >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) >> >> Can you please let me know if Apache beam v2.1.0 Spark runner is >> compatible to work with Spark v2.1.0? >> >> Below is the code snippet for the pipeline: >> >> PipelineOptionsFactory./register/(CSVOptions.*class*); >> >> CSVOptions options= >> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOption >> s >> .*class*); >> >> options.setRunner(SparkRunner.*class*); >> >> options.setSparkMaster("local[4]"); >> >> options.setEnableSparkMetricSinks(*false*); >> >> Pipeline p= Pipeline./create/(options); >> >> p.apply("ReadMyCSVFile", >> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile()))) >> >> .apply(*new*DataLoader()) >> >> >> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration >> >> >> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdb >> c >> :postgresql://localhost:5432/beam") >> >> >> .withUsername("postgres").withPassword("postgres")).withStatement("in >> s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") >> >> >> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Str >> i >> ng>()_ { >> >> *public**void*setParameters(String element, PreparedStatement query) >> *throws*SQLException { >> >> String[] datas= >> element.split("\t"); >> >> *if*(datas.length>0) { >> >> *for*(*int*j=0 ; j<datas.length;j++){ >> >> query.setString(j+1, datas[j]); >> >> } >> >> } >> >> } >> >> })); >> >> SparkRunner runner= SparkRunner./create/(options); >> >> runner.run(p).waitUntilFinish(); >> >> Any help would be greatly appreciated. >> >> Thanks, >> >> Mahender >> > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com > -- Jean-Baptiste Onofré [email protected] http://blog.nanthrax.net Talend - http://www.talend.com
