Thanks very much JB, will look forward to your note. 

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:[email protected]] 
Sent: Friday, September 1, 2017 1:54 AM
To: [email protected]
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Sure, I will send the PR during the weekend. I will let you know.

Regards
JB

On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
> Thanks JB.  Could you please point me to the location of Spark Runner 
> specific to Spark 2.x or is this something part any configurations?
> 
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:[email protected]]
> Sent: Thursday, August 31, 2017 12:11 AM
> To: [email protected]
> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> 
> Hi,
> 
> I'm working on a Spark runner specific to Spark 2.x as the API changed.
> 
> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
> 
> Regards
> JB
> 
> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>> Hello,
>>
>> I am running into spark assertion error when running a apache 
>> pipeline and below are the details:
>>
>> Apache Beam version: 2.1.0
>>
>> Spark version: 2.1.0
>>
>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset 
>> must return a zero value copy
>>
>>                  at scala.Predef$.assert(Predef.scala:179)
>>
>>                  at
>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>> 1
>> 62)
>>
>>                  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>>
>>                  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>>
>>                  at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>
>> Can you please let me know if Apache beam v2.1.0 Spark runner is 
>> compatible to work with Spark v2.1.0?
>>
>> Below is the code snippet for the pipeline:
>>
>>          PipelineOptionsFactory./register/(CSVOptions.*class*);
>>
>>                CSVOptions options=
>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOption
>> s
>> .*class*);
>>
>> options.setRunner(SparkRunner.*class*);
>>
>> options.setSparkMaster("local[4]");
>>
>> options.setEnableSparkMetricSinks(*false*);
>>
>>                Pipeline p= Pipeline./create/(options);
>>
>> p.apply("ReadMyCSVFile",
>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>
>>                .apply(*new*DataLoader())
>>
>>                
>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>
>>          
>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdb
>> c
>> :postgresql://localhost:5432/beam")
>>
>>                           
>> .withUsername("postgres").withPassword("postgres")).withStatement("in
>> s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>
>>                              
>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Str
>> i
>> ng>()_ {
>>
>> *public**void*setParameters(String element, PreparedStatement query) 
>> *throws*SQLException {
>>
>>                                           String[] datas= 
>> element.split("\t");
>>
>> *if*(datas.length>0) {
>>
>> *for*(*int*j=0 ; j<datas.length;j++){
>>
>> query.setString(j+1, datas[j]);
>>
>>                                                  }
>>
>>                                           }
>>
>> }
>>
>>                  }));
>>
>>                SparkRunner runner= SparkRunner./create/(options);
>>
>> runner.run(p).waitUntilFinish();
>>
>> Any help would be greatly appreciated.
>>
>> Thanks,
>>
>> Mahender
>>
> 
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to