[
https://issues.apache.org/jira/browse/HAMA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15730450#comment-15730450
]
Edward J. Yoon commented on HAMA-983:
-------------------------------------
Here's my skeleton code with example that counts the words. You should
implement the HamaPipelineRunner. Just translate and execute batch job. I think
you can find how to translate them from flink's code:
https://github.com/dataArtisans/flink-dataflow/blob/aad5d936abd41240f3e15d294ea181fb9cca05e0/runner/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java#L410
{code}
public class WordCountTest {
static final String[] WORDS_ARRAY = new String[] { "hi there", "hi",
"hi sue bob", "hi sue", "", "bob hi" };
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
static final String[] COUNTS_ARRAY = new String[] { "hi: 5", "there: 1",
"sue: 2", "bob: 2" };
/**
* Example test that tests a PTransform by using an in-memory input and
* inspecting the output.
*/
@Test
@Category(RunnableOnService.class)
public void testCountWords() throws Exception {
HamaOptions options = PipelineOptionsFactory.as(HamaOptions.class);
options.setRunner(HamaPipelineRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(Create.of(WORDS).withCoder(
StringUtf8Coder.of()));
PCollection<String> output = input
.apply(new WordCount())
.apply(MapElements.via(new FormatAsTextFn()));
//.apply(TextIO.Write.to("/tmp/result"));
PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
p.run().waitUntilFinish();
}
public static class WordCount extends
PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
private static final long serialVersionUID = 1L;
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new DoFn<String,
String>() {
private static final long serialVersionUID = 1L;
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
}
// Split the line into words.
String[] words = c.element().split("[^a-zA-Z']+");
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count
.<String> perElement());
return wordCounts;
}
}
// ///// TODO
public static class HamaPipelineRunner extends
PipelineRunner<HamaPipelineResult> {
public static HamaPipelineRunner fromOptions(PipelineOptions x) {
return new HamaPipelineRunner();
}
@Override
public <Output extends POutput, Input extends PInput> Output apply(
PTransform<Input, Output> transform, Input input) {
return super.apply(transform, input);
}
@Override
public HamaPipelineResult run(Pipeline pipeline) {
// TODO Auto-generated method stub
System.out.println("Executing pipeline using HamaPipelineRunner.");
// TODO you need to translate pipeline to Hama program
// and execute pipeline
// return the result
return null;
}
}
public class HamaPipelineResult implements PipelineResult {
@Override
public State getState() {
// TODO Auto-generated method stub
return null;
}
@Override
public State cancel() throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public State waitUntilFinish(Duration duration) {
// TODO Auto-generated method stub
return null;
}
@Override
public State waitUntilFinish() {
// TODO Auto-generated method stub
return null;
}
@Override
public <T> AggregatorValues<T> getAggregatorValues(
Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
// TODO Auto-generated method stub
return null;
}
@Override
public MetricResults metrics() {
// TODO Auto-generated method stub
return null;
}
}
public static interface HamaOptions extends PipelineOptions {
}
}
{code}
> Hama runner for DataFlow
> ------------------------
>
> Key: HAMA-983
> URL: https://issues.apache.org/jira/browse/HAMA-983
> Project: Hama
> Issue Type: Bug
> Reporter: Edward J. Yoon
> Labels: gsoc2016
>
> As you already know, Apache Beam provides unified programming model for both
> batch and streaming inputs.
> The APIs are generally associated with data filtering and transforming. So
> we'll need to implement some data processing runner like
> https://github.com/dapurv5/MapReduce-BSP-Adapter/blob/master/src/main/java/org/apache/hama/mapreduce/examples/WordCount.java
> Also, implementing similarity join can be funny. According to
> http://www.ruizhang.info/publications/TPDS2015-Heads_Join.pdf, Apache Hama is
> clearly winner among Apache Hadoop and Apache Spark.
> Since it consists of transformation, aggregation, and partition computations,
> I think it's possible to implement using Apache Beam APIs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)