//ForEachPartFunction.java: import org.apache.spark.api.java.function.ForeachPartitionFunction; import org.apache.spark.sql.Row; import java.util.ArrayList; import java.util.Iterator; import java.util.List;
public class ForEachPartFunction implements ForeachPartitionFunction<Row>{ public void call(Iterator<Row> t) throws Exception{ List<String> rows = new ArrayList<String>(); while(t.hasNext()) { Row irow = t.next(); rows.add(irow.toString()); } System.out.println(rows.toString()); } } On Tue, May 30, 2017 at 2:01 PM, Anton Kravchenko < kravchenko.anto...@gmail.com> wrote: > Ok, there are at least two ways to do it: > Dataset<Row> df = spark.read.csv("file:///C:/input_data/*.csv") > > df.foreachPartition(new ForEachPartFunction()); > df.toJavaRDD().foreachPartition(new Void_java_func()); > > where ForEachPartFunction and Void_java_func are defined below: > > // ForEachPartFunction.java: > import org.apache.spark.api.java.function.VoidFunction; > import org.apache.spark.sql.Row; > import java.util.ArrayList; > import java.util.Iterator; > import java.util.List; > > public class Void_java_func implements VoidFunction<Iterator<Row>> { > public void call(Iterator<Row> it) { > List<String> rows = new ArrayList<String>(); > > while (it.hasNext()) { > Row irow = it.next(); > rows.add(irow.toString()); > } > } > } > > // Void_java_func.java: > import org.apache.spark.api.java.function.VoidFunction; > import org.apache.spark.sql.Row; > import java.util.ArrayList; > import java.util.Iterator; > import java.util.List; > > public class Void_java_func implements VoidFunction<Iterator<Row>> { > public void call(Iterator<Row> it) { > List<String> rows = new ArrayList<String>(); > > while (it.hasNext()) { > Row irow = it.next(); > rows.add(irow.toString()); > } > } > } > > Anton > > > On Tue, May 30, 2017 at 10:58 AM, Anton Kravchenko < > kravchenko.anto...@gmail.com> wrote: > >> What would be a Java equivalent of the Scala code below? >> >> def void_function_in_scala(ipartition: Iterator[Row]): Unit ={ >> var df_rows=ArrayBuffer[String]() >> for(irow<-ipartition){ >> df_rows+=irow.toString >> } >> >> val df = spark.read.csv("file:///C:/input_data/*.csv") >> df.foreachPartition(void_function_in_scala); >> >> Thank you, >> Anton >> > >