Hello,
I am running into spark assertion error when running a apache pipeline and
below are the details:
Apache Beam version: 2.1.0
Spark version: 2.1.0
Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return
a zero value copy
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
Can you please let me know if Apache beam v2.1.0 Spark runner is compatible to
work with Spark v2.1.0?
Below is the code snippet for the pipeline:
PipelineOptionsFactory.register(CSVOptions.class);
CSVOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CSVOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[4]");
options.setEnableSparkMetricSinks(false);
Pipeline p = Pipeline.create(options);
p.apply("ReadMyCSVFile",
TextIO.read().from(URIUtil.getFromPath(options.getInputFile())))
.apply(new DataLoader())
.apply(JdbcIO.<String>write().withDataSourceConfiguration
(JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://localhost:5432/beam")
.withUsername("postgres").withPassword("postgres")).withStatement("insert into
test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
.withPreparedStatementSetter(new
JdbcIO.PreparedStatementSetter<String>() {
public void setParameters(String element,
PreparedStatement query) throws SQLException {
String[] datas = element.split("\t");
if(datas.length >0) {
for(int j=0 ;
j<datas.length;j++){
query.setString(j+1,
datas[j]);
}
}
}
}));
SparkRunner runner = SparkRunner.create(options);
runner.run(p).waitUntilFinish();
Any help would be greatly appreciated.
Thanks,
Mahender