Hello,

I am running into spark assertion error when running a apache pipeline and 
below are the details:

Apache Beam version: 2.1.0
Spark version: 2.1.0

Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return 
a zero value copy
               at scala.Predef$.assert(Predef.scala:179)
               at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)

Can you please let me know if Apache beam v2.1.0 Spark runner is compatible to 
work with Spark v2.1.0?

Below is the code snippet for the pipeline:

       PipelineOptionsFactory.register(CSVOptions.class);
             CSVOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(CSVOptions.class);
             options.setRunner(SparkRunner.class);
             options.setSparkMaster("local[4]");
             options.setEnableSparkMetricSinks(false);
             Pipeline p = Pipeline.create(options);
             p.apply("ReadMyCSVFile", 
TextIO.read().from(URIUtil.getFromPath(options.getInputFile())))
             .apply(new DataLoader())
             .apply(JdbcIO.<String>write().withDataSourceConfiguration
       
(JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://localhost:5432/beam")
                        
.withUsername("postgres").withPassword("postgres")).withStatement("insert into 
test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
                           .withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter<String>() {
                                 public void setParameters(String element, 
PreparedStatement query) throws SQLException {
                                        String[] datas = element.split("\t");
                                        if(datas.length >0) {
                                               for(int j=0 ; 
j<datas.length;j++){
                                                     query.setString(j+1, 
datas[j]);
                                               }
                                        }

                                 }
               }));
             SparkRunner runner = SparkRunner.create(options);
             runner.run(p).waitUntilFinish();


Any help would be greatly appreciated.

Thanks,
Mahender

Reply via email to