Hi,
I am implementing WordCount example
from org.apache.beam.runners.spark.examples using beam-release-0.6.0.
I have changes to this program, and I trying to execute it on machine
across LAN,
am able to create pipeline and read the read them but when I apply
.apply(new CountWords())
it gives me this exception :
17/05/30 23:56:22 INFO SparkRunner: Executing pipeline using the SparkRunner.
17/05/30 23:56:22 INFO SparkContextFactory: Using a provided Spark Context
17/05/30 23:56:23 INFO SparkRunner$Evaluator: Evaluating Read(CompressedSource)
17/05/30 23:56:23 INFO SparkRunner$Evaluator: Evaluating ParDo(ExtractWords)
17/05/30 23:56:23 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
Registered executor NettyRpcEndpointRef(null) (192.168.1.1:53428) with
ID 1
17/05/30 23:56:23 INFO BlockManagerMasterEndpoint: Registering block
manager 192.168.1.1:35482 with 366.3 MB RAM, BlockManagerId(1,
192.168.1.214, 35482, None)
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.RuntimeException: java.lang.AssertionError:
assertion failed: copyAndReset must return a zero value copy
at
org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
at
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
at
com.impressico.apache.beam.word.count.vs.WordCountVS.main(WordCountVS.java:177)
... 6 more
Caused by: java.lang.AssertionError: assertion failed: copyAndReset
must return a zero value copy
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:163)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
at
org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:155)
at
org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:45)
at
org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:348)
at
org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:328)
at
org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:374)
at
org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:360)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:231)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
at org.apache.beam.runners.spark.SparkRunner$2.run(SparkRunner.java:226)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I have been trying to explore this exception, there's not much help for the
same, sharing my sample code.
public class WordCount implements JavaStreamingContextFactory{
private static final Logger logger =
LoggerFactory.getLogger(WordCount.class);
public static String path = "/home/username/kinglear.txt";
public static String username = "username";
public static String sparkMaster = "spark://192.168.1.1:7077";
public static String appName = "SparkRunnerFileRead";
public static String pathToWrite = "/home/username/beamWorks/output.txt";
public static LongAccumulator longAccumulator ;
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
}
// Split the line into words.
String[] words = c.element().split("[^a-zA-Z']+");
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>,
String> {
@Override
public String apply(KV<String, Long> input) {
logger.info("return input.getKey() + : + input.getValue()");
return input.toString();
}
}
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
longAccumulator.add(wordCounts.expand().size());
return wordCounts;
}
}
public interface WordCountOptions extends SparkContextOptions {
@Description("Path of the file to read from")
@Default.String("/home/username/kinglear.txt")
String getInputFile();
void setInputFile(String value);
@Description("Path of the file to write to")
@Default.String("/home/username/beamWorks/output.txt")
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) {
WordCount wc = new WordCount();
System.setProperty("username", username);
WordCountOptions sparkContextOptions =
PipelineOptionsFactory.as(WordCountOptions.class);
sparkContextOptions.setSparkMaster(sparkMaster);
sparkContextOptions.setAppName("BeamReadOperations");
sparkContextOptions.setStreaming(false);
sparkContextOptions.setRunner(SparkRunner.class);
sparkContextOptions.setEnableSparkMetricSinks(true);
sparkContextOptions.setJobName("BeamReadOperations Job");
sparkContextOptions.as(SparkPipelineOptions.class);
sparkContextOptions.setCheckpointDurationMillis(1000L);
sparkContextOptions.setMaxRecordsPerBatch(1000L);
sparkContextOptions.setOptionsId(1000L);
sparkContextOptions.setProvidedSparkContext(wc.create().sparkContext());
sparkContextOptions.setUsesProvidedSparkContext(true);
Pipeline p = Pipeline.create(sparkContextOptions);
p.apply("ReadLines", TextIO.Read.from(sparkContextOptions.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(sparkContextOptions.getOutput()));
p.run().waitUntilFinish();
}
public JavaStreamingContext create() {
return createJavaStreamingContext();
}
private static JavaSparkContext createJavaSparkContext() {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName(appName);
sparkConf.setMaster(sparkMaster);
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.JavaSerializer");
JavaSerializer javaSerializer = new JavaSerializer(sparkConf);
javaSerializer.newInstance();
javaSerializer.org$apache$spark$serializer$JavaSerializer$$counterReset();
SparkContext sparkContext = new SparkContext(sparkMaster, appName,
sparkConf);
longAccumulator = sparkContext.longAccumulator();
longAccumulator.setValue(100L);
JavaSparkContext javaSparkContext =
JavaSparkContext.fromSparkContext(sparkContext);
return javaSparkContext;
}
private JavaStreamingContext createJavaStreamingContext() {
JavaStreamingContext javaStreamingContext = new
JavaStreamingContext(createJavaSparkContext(),
new Duration(10000L));
return javaStreamingContext;
}
}
just for info : i am using these dependencies
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.6.0</version>
</dependency>
<!-- <dependency> <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hdfs</artifactId>
<version>0.6.0</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-core
-->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>0.6.0</version>
</dependency>
<!--
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
and the spark-submit command I use :
spark-submit --class com.impressico.apache.beam.word.count.vs.WordCount
--master spark://192.168.1.1:6066 --deploy-mode cluster
/home/username/beamWorks/WordCount.jar --runner=SparkRunner --name WordCount
any inputs will be very helpful for me.
Thanks,
Vaibhav Sharma
package com.impressico.apache.beam.word.count.vs;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WordCount implements JavaStreamingContextFactory{
private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
public static String path = "/home/username/kinglear.txt";
public static String username = "username";
public static String sparkMaster = "spark://192.168.1.1:7077";
public static String appName = "SparkRunnerHdfsFileRead";
public static String pathToWrite = "/home/username/beamWorks/output.txt";
public static LongAccumulator longAccumulator ;
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
}
// Split the line into words.
String[] words = c.element().split("[^a-zA-Z']+");
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
logger.info("return input.getKey() + : + input.getValue()");
return input.toString();
}
}
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement());
longAccumulator.add(wordCounts.expand().size());
return wordCounts;
}
}
public interface WordCountOptions extends SparkContextOptions {
@Description("Path of the file to read from")
@Default.String("/home/username/kinglear.txt")
String getInputFile();
void setInputFile(String value);
@Description("Path of the file to write to")
@Default.String("/home/username/beamWorks/output.txt")
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) {
WordCount wc = new WordCount();
System.setProperty("username", username);
WordCountOptions sparkContextOptions = PipelineOptionsFactory.as(WordCountOptions.class);
sparkContextOptions.setSparkMaster(sparkMaster);
sparkContextOptions.setAppName("BeamReadOperations");
sparkContextOptions.setStreaming(false);
sparkContextOptions.setRunner(SparkRunner.class);
sparkContextOptions.setEnableSparkMetricSinks(true);
sparkContextOptions.setJobName("BeamReadOperations Job");
sparkContextOptions.as(SparkPipelineOptions.class);
sparkContextOptions.setCheckpointDurationMillis(1000L);
sparkContextOptions.setMaxRecordsPerBatch(1000L);
sparkContextOptions.setOptionsId(1000L);
sparkContextOptions.setProvidedSparkContext(wc.create().sparkContext());
sparkContextOptions.setUsesProvidedSparkContext(true);
Pipeline p = Pipeline.create(sparkContextOptions);
p.apply("ReadLines", TextIO.Read.from(sparkContextOptions.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(sparkContextOptions.getOutput()));
p.run().waitUntilFinish();
}
public JavaStreamingContext create() {
return createJavaStreamingContext();
}
private static JavaSparkContext createJavaSparkContext() {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName(appName);
sparkConf.setMaster(sparkMaster);
sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
JavaSerializer javaSerializer = new JavaSerializer(sparkConf);
javaSerializer.newInstance();
javaSerializer.org$apache$spark$serializer$JavaSerializer$$counterReset();
SparkContext sparkContext = new SparkContext(sparkMaster, appName, sparkConf);
longAccumulator = sparkContext.longAccumulator();
longAccumulator.setValue(100L);
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkContext);
return javaSparkContext;
}
private JavaStreamingContext createJavaStreamingContext() {
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(createJavaSparkContext(),
new Duration(10000L));
return javaStreamingContext;
}
}