SAMZA-495; upgrade to 0.8.0 samza
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/commit/f9efa43a Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/tree/f9efa43a Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/diff/f9efa43a Branch: refs/heads/master Commit: f9efa43acb477ffba989ca78c5e66d9e93f4b68f Parents: 59d1877 Author: Chris Riccomini <[email protected]> Authored: Tue Dec 9 10:14:49 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Tue Dec 9 10:14:49 2014 -0800 ---------------------------------------------------------------------- .gitignore | 1 + bin/grid | 8 +- pom.xml | 195 ++++++----- samza-job-package/pom.xml | 121 ------- samza-job-package/src/main/assembly/src.xml | 80 ----- .../src/main/config/wikipedia-feed.properties | 44 --- .../src/main/config/wikipedia-parser.properties | 52 --- .../src/main/config/wikipedia-stats.properties | 53 --- samza-job-package/src/main/resources/log4j.xml | 36 -- samza-wikipedia/pom.xml | 65 ---- .../wikipedia/system/WikipediaConsumer.java | 77 ----- .../wikipedia/system/WikipediaFeed.java | 332 ------------------- .../system/WikipediaSystemFactory.java | 50 --- .../wikipedia/task/WikipediaFeedStreamTask.java | 43 --- .../task/WikipediaParserStreamTask.java | 98 ------ .../task/WikipediaStatsStreamTask.java | 92 ----- src/main/assembly/src.xml | 81 +++++ src/main/config/wikipedia-feed.properties | 44 +++ src/main/config/wikipedia-parser.properties | 52 +++ src/main/config/wikipedia-stats.properties | 57 ++++ .../wikipedia/system/WikipediaConsumer.java | 77 +++++ .../wikipedia/system/WikipediaFeed.java | 332 +++++++++++++++++++ .../system/WikipediaSystemFactory.java | 50 +++ .../wikipedia/task/WikipediaFeedStreamTask.java | 43 +++ .../task/WikipediaParserStreamTask.java | 98 ++++++ .../task/WikipediaStatsStreamTask.java | 92 +++++ src/main/resources/log4j.xml | 39 +++ 27 files changed, 1077 insertions(+), 1235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 1898309..0435c14 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ target/ *.iws */.cache deploy +*.swp http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/bin/grid ---------------------------------------------------------------------- diff --git a/bin/grid b/bin/grid index 4324c92..25b2ec4 100755 --- a/bin/grid +++ b/bin/grid @@ -35,8 +35,8 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download COMMAND=$1 SYSTEM=$2 -DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz -DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.2.0/hadoop-2.2.0.tar.gz +DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz +DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz bootstrap() { @@ -63,7 +63,7 @@ install_zookeeper() { install_yarn() { mkdir -p "$DEPLOY_ROOT_DIR" - install yarn $DOWNLOAD_YARN hadoop-2.2.0 + install yarn $DOWNLOAD_YARN hadoop-2.4.0 cp "$BASE_DIR/conf/yarn-site.xml" "$DEPLOY_ROOT_DIR/yarn/etc/hadoop/yarn-site.xml" if [ ! -f "$HOME/.samza/conf/yarn-site.xml" ]; then mkdir -p "$HOME/.samza/conf" @@ -73,7 +73,7 @@ install_yarn() { install_kafka() { mkdir -p "$DEPLOY_ROOT_DIR" - install kafka $DOWNLOAD_KAFKA kafka_2.9.2-0.8.1.1 + install kafka $DOWNLOAD_KAFKA kafka_2.10-0.8.1.1 # have to use SIGTERM since nohup on appears to ignore SIGINT # and Kafka switched to SIGINT in KAFKA-1031. sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 90f6c03..0891177 100644 --- a/pom.xml +++ b/pom.xml @@ -25,99 +25,102 @@ under the License. <maven>3.0.0</maven> </prerequisites> - <groupId>samza</groupId> - <artifactId>samza-example-parent</artifactId> - <version>0.7.0</version> - <packaging>pom</packaging> - <name>Samza Parent</name> + <groupId>org.apache.samza</groupId> + <artifactId>hello-samza</artifactId> + <version>0.8.0</version> + <packaging>jar</packaging> + <name>Samza Example</name> <description> Samza is a stream processing system. Think of it as Map-Reduce for streams. </description> - <url>https://github.com/linkedin/hello-samza</url> + <url>https://samza.incubator.apache.org/</url> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>samza</groupId> - <artifactId>samza-wikipedia</artifactId> - <version>0.7.0</version> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-api</artifactId> - <version>${samza.version}</version> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-core_2.10</artifactId> - <version>${samza.version}</version> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-serializers_2.10</artifactId> - <version>${samza.version}</version> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-shell</artifactId> - <classifier>dist</classifier> - <type>tgz</type> - <version>${samza.version}</version> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-yarn_2.10</artifactId> - <version>${samza.version}</version> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-kv_2.10</artifactId> - <version>${samza.version}</version> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-kafka_2.10</artifactId> - <version>${samza.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.8.1</version> - </dependency> - <dependency> - <groupId>org.schwering</groupId> - <artifactId>irclib</artifactId> - <version>1.10</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.6.2</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.6.2</version> - </dependency> - <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-jaxrs</artifactId> - <version>1.8.5</version> - </dependency> - </dependencies> - </dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-api</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-core_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-log4j</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-serializers_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-shell</artifactId> + <classifier>dist</classifier> + <type>tgz</type> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-yarn_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-kv_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-kv-rocksdb_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-kafka_2.10</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.1</version> + </dependency> + <dependency> + <groupId>org.schwering</groupId> + <artifactId>irclib</artifactId> + <version>1.10</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.6.2</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.6.2</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + <version>1.8.5</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>2.4.0</version> + </dependency> + </dependencies> <properties> <!-- maven specific properties --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <samza.version>0.7.0</samza.version> + <samza.version>0.8.0</samza.version> </properties> - <modules> - <module>samza-job-package</module> - <module>samza-wikipedia</module> - </modules> - <developers> <developer> <name>Chris Riccomini</name> @@ -159,11 +162,6 @@ under the License. <name>Scala-tools Maven2 Repository</name> <url>https://oss.sonatype.org/content/groups/scala-tools</url> </repository> - <!-- for zkclient --> - <repository> - <id>sonatype</id> - <url>http://oss.sonatype.org/content/groups/public</url> - </repository> </repositories> <pluginRepositories> @@ -183,6 +181,7 @@ under the License. <version>0.9</version> <configuration> <excludes> + <exclude>**/target/**</exclude> <exclude>*.json</exclude> <exclude>.vagrant/**</exclude> <exclude>.git/**</exclude> @@ -193,6 +192,7 @@ under the License. <exclude>.gitignore</exclude> <exclude>**/.cache/**</exclude> <exclude>deploy/**</exclude> + <exclude>**/.project</exclude> </excludes> </configuration> </plugin> @@ -221,6 +221,25 @@ under the License. </execution> </executions> </plugin> + <!-- plugin to build the tar.gz file filled with examples --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.3</version> + <configuration> + <descriptors> + <descriptor>src/main/assembly/src.xml</descriptor> + </descriptors> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/pom.xml ---------------------------------------------------------------------- diff --git a/samza-job-package/pom.xml b/samza-job-package/pom.xml deleted file mode 100644 index 169a28f..0000000 --- a/samza-job-package/pom.xml +++ /dev/null @@ -1,121 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>samza</groupId> - <artifactId>samza-example-parent</artifactId> - <version>0.7.0</version> - </parent> - - <artifactId>samza-job-package</artifactId> - <name>Samza Job Package</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>samza</groupId> - <artifactId>samza-wikipedia</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-shell</artifactId> - <classifier>dist</classifier> - <type>tgz</type> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-core_2.10</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-serializers_2.10</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-yarn_2.10</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-kv_2.10</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-kafka_2.10</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>2.2.0</version> - <scope>runtime</scope> - </dependency> - </dependencies> - - <licenses> - <license> - <name>Apache License 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.html</url> - <distribution>repo</distribution> - </license> - </licenses> - - <build> - <plugins> - <!-- plugin to build the tar.gz file filled with examples --> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.3</version> - <configuration> - <descriptors> - <descriptor>src/main/assembly/src.xml</descriptor> - </descriptors> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/samza-job-package/src/main/assembly/src.xml b/samza-job-package/src/main/assembly/src.xml deleted file mode 100644 index 14a5ad5..0000000 --- a/samza-job-package/src/main/assembly/src.xml +++ /dev/null @@ -1,80 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - you under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> - -<assembly - xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>dist</id> - <formats> - <format>tar.gz</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/..</directory> - <includes> - <include>README*</include> - <include>LICENSE*</include> - <include>NOTICE*</include> - </includes> - </fileSet> - </fileSets> - <files> - <file> - <source>${basedir}/src/main/resources/log4j.xml</source> - <outputDirectory>lib</outputDirectory> - </file> - <!-- filtered=true, so we do variable expansion so the yarn package path - always points to the correct spot on any machine --> - <file> - <source>${basedir}/src/main/config/wikipedia-feed.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - <file> - <source>${basedir}/src/main/config/wikipedia-parser.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - <file> - <source>${basedir}/src/main/config/wikipedia-stats.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - </files> - <dependencySets> - <dependencySet> - <outputDirectory>bin</outputDirectory> - <includes> - <include>org.apache.samza:samza-shell:tgz:dist:*</include> - </includes> - <fileMode>0744</fileMode> - <unpack>true</unpack> - </dependencySet> - <dependencySet> - <outputDirectory>lib</outputDirectory> - <includes> - <include>org.apache.samza:samza-core_2.10</include> - <include>org.apache.samza:samza-kafka_2.10</include> - <include>org.apache.samza:samza-serializers_2.10</include> - <include>org.apache.samza:samza-yarn_2.10</include> - <include>org.apache.samza:samza-kv_2.10</include> - <include>org.slf4j:slf4j-log4j12</include> - <include>samza:samza-wikipedia</include> - <include>org.apache.kafka:kafka_2.10</include> - <include>org.apache.hadoop:hadoop-hdfs</include> - </includes> - <useTransitiveFiltering>true</useTransitiveFiltering> - </dependencySet> - </dependencySets> -</assembly> http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-feed.properties ---------------------------------------------------------------------- diff --git a/samza-job-package/src/main/config/wikipedia-feed.properties b/samza-job-package/src/main/config/wikipedia-feed.properties deleted file mode 100644 index c498c16..0000000 --- a/samza-job-package/src/main/config/wikipedia-feed.properties +++ /dev/null @@ -1,44 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=wikipedia-feed - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask -task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews - -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory - -# Wikipedia System -systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory -systems.wikipedia.host=irc.wikimedia.org -systems.wikipedia.port=6667 - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=json -systems.kafka.consumer.zookeeper.connect=localhost:2181/ -systems.kafka.producer.metadata.broker.list=localhost:9092 -systems.kafka.producer.producer.type=sync -# Normally, we'd set this much higher, but we want things to look snappy in the demo. -systems.kafka.producer.batch.num.messages=1 http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-parser.properties ---------------------------------------------------------------------- diff --git a/samza-job-package/src/main/config/wikipedia-parser.properties b/samza-job-package/src/main/config/wikipedia-parser.properties deleted file mode 100644 index 38575b6..0000000 --- a/samza-job-package/src/main/config/wikipedia-parser.properties +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=wikipedia-parser - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask -task.inputs=kafka.wikipedia-raw -task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory -task.checkpoint.system=kafka -# Normally, this would be 3, but we have only one broker. -task.checkpoint.replication.factor=1 - -# Metrics -metrics.reporters=snapshot,jmx -metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory -metrics.reporter.snapshot.stream=kafka.metrics -metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory - -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory -serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory - -# Systems -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=json -systems.kafka.consumer.zookeeper.connect=localhost:2181/ -systems.kafka.consumer.auto.offset.reset=largest -systems.kafka.producer.metadata.broker.list=localhost:9092 -systems.kafka.producer.producer.type=sync -# Normally, we'd set this much higher, but we want things to look snappy in the demo. -systems.kafka.producer.batch.num.messages=1 -systems.kafka.streams.metrics.samza.msg.serde=metrics http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-stats.properties ---------------------------------------------------------------------- diff --git a/samza-job-package/src/main/config/wikipedia-stats.properties b/samza-job-package/src/main/config/wikipedia-stats.properties deleted file mode 100644 index be0c749..0000000 --- a/samza-job-package/src/main/config/wikipedia-stats.properties +++ /dev/null @@ -1,53 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=wikipedia-stats - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask -task.inputs=kafka.wikipedia-edits -task.window.ms=10000 - -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory -serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory -serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory - -# Systems -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=json -systems.kafka.consumer.zookeeper.connect=localhost:2181/ -systems.kafka.consumer.auto.offset.reset=largest -systems.kafka.producer.metadata.broker.list=localhost:9092 -systems.kafka.producer.producer.type=sync -# Normally, we'd set this much higher, but we want things to look snappy in the demo. -systems.kafka.producer.batch.num.messages=1 - -# Key-value storage -stores.wikipedia-stats.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory -stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog -stores.wikipedia-stats.key.serde=string -stores.wikipedia-stats.msg.serde=integer - -# Normally, we'd set this much higher, but we want things to look snappy in the demo. -stores.wikipedia-stats.write.batch.size=0 -stores.wikipedia-stats.object.cache.size=0 http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/samza-job-package/src/main/resources/log4j.xml b/samza-job-package/src/main/resources/log4j.xml deleted file mode 100644 index a937165..0000000 --- a/samza-job-package/src/main/resources/log4j.xml +++ /dev/null @@ -1,36 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" ?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> - -<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> -<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> - <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender"> - <param name="File" value="${samza.log.dir}/${samza.container.name}.log" /> - <param name="DatePattern" value="'.'yyyy-MM-dd" /> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" /> - </layout> - </appender> - <root> - <priority value="info" /> - <appender-ref ref="RollingAppender"/> - </root> -</log4j:configuration> http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/pom.xml ---------------------------------------------------------------------- diff --git a/samza-wikipedia/pom.xml b/samza-wikipedia/pom.xml deleted file mode 100644 index 20d94ed..0000000 --- a/samza-wikipedia/pom.xml +++ /dev/null @@ -1,65 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>samza</groupId> - <artifactId>samza-example-parent</artifactId> - <version>0.7.0</version> - </parent> - - <artifactId>samza-wikipedia</artifactId> - <name>Samza Wikipedia Example</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.schwering</groupId> - <artifactId>irclib</artifactId> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.samza</groupId> - <artifactId>samza-kv_2.10</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-jaxrs</artifactId> - </dependency> - </dependencies> - - <licenses> - <license> - <name>Apache License 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.html</url> - <distribution>repo</distribution> - </license> - </licenses> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java ---------------------------------------------------------------------- diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java deleted file mode 100644 index f156c3b..0000000 --- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.wikipedia.system; - -import java.util.ArrayList; -import java.util.List; -import org.apache.samza.Partition; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.BlockingEnvelopeMap; -import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; -import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedListener; - -public class WikipediaConsumer extends BlockingEnvelopeMap implements WikipediaFeedListener { - private final List<String> channels; - private final String systemName; - private final WikipediaFeed feed; - - public WikipediaConsumer(String systemName, WikipediaFeed feed, MetricsRegistry registry) { - this.channels = new ArrayList<String>(); - this.systemName = systemName; - this.feed = feed; - } - - public void onEvent(final WikipediaFeedEvent event) { - SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, event.getChannel(), new Partition(0)); - - try { - put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, event)); - } catch (Exception e) { - System.err.println(e); - } - } - - @Override - public void register(SystemStreamPartition systemStreamPartition, String startingOffset) { - super.register(systemStreamPartition, startingOffset); - - channels.add(systemStreamPartition.getStream()); - } - - @Override - public void start() { - feed.start(); - - for (String channel : channels) { - feed.listen(channel, this); - } - } - - @Override - public void stop() { - for (String channel : channels) { - feed.unlisten(channel, this); - } - - feed.stop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java ---------------------------------------------------------------------- diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java deleted file mode 100644 index 16e302e..0000000 --- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.wikipedia.system; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import org.apache.samza.SamzaException; -import org.codehaus.jackson.map.ObjectMapper; -import org.schwering.irc.lib.IRCConnection; -import org.schwering.irc.lib.IRCEventListener; -import org.schwering.irc.lib.IRCModeParser; -import org.schwering.irc.lib.IRCUser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class WikipediaFeed { - private static final Logger log = LoggerFactory.getLogger(WikipediaFeed.class); - private static final Random random = new Random(); - private static final ObjectMapper jsonMapper = new ObjectMapper(); - - private final Map<String, Set<WikipediaFeedListener>> channelListeners; - private final String host; - private final int port; - private final IRCConnection conn; - private final String nick; - - public WikipediaFeed(String host, int port) { - this.channelListeners = new HashMap<String, Set<WikipediaFeedListener>>(); - this.host = host; - this.port = port; - this.nick = "samza-bot-" + Math.abs(random.nextInt()); - this.conn = new IRCConnection(host, new int[] { port }, "", nick, nick, nick); - this.conn.addIRCEventListener(new WikipediaFeedIrcListener()); - this.conn.setEncoding("UTF-8"); - this.conn.setPong(true); - this.conn.setColors(false); - } - - public void start() { - try { - this.conn.connect(); - } catch (IOException e) { - throw new RuntimeException("Unable to connect to " + host + ":" + port + ".", e); - } - } - - public void stop() { - this.conn.interrupt(); - - try { - this.conn.join(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while trying to shutdown IRC connection for " + host + ":" + port, e); - } - - if (this.conn.isAlive()) { - throw new RuntimeException("Unable to shutdown IRC connection for " + host + ":" + port); - } - } - - public void listen(String channel, WikipediaFeedListener listener) { - Set<WikipediaFeedListener> listeners = channelListeners.get(channel); - - if (listeners == null) { - listeners = new HashSet<WikipediaFeedListener>(); - channelListeners.put(channel, listeners); - join(channel); - } - - listeners.add(listener); - } - - public void unlisten(String channel, WikipediaFeedListener listener) { - Set<WikipediaFeedListener> listeners = channelListeners.get(channel); - - if (listeners == null) { - throw new RuntimeException("Trying to unlisten to a channel that has no listeners in it."); - } else if (!listeners.contains(listener)) { - throw new RuntimeException("Trying to unlisten to a channel that listener is not listening to."); - } - - listeners.remove(listener); - - if (listeners.size() == 0) { - leave(channel); - } - } - - public void join(String channel) { - conn.send("JOIN " + channel); - } - - public void leave(String channel) { - conn.send("PART " + channel); - } - - public class WikipediaFeedIrcListener implements IRCEventListener { - public void onRegistered() { - log.info("Connected"); - } - - public void onDisconnected() { - log.info("Disconnected"); - } - - public void onError(String msg) { - log.info("Error: " + msg); - } - - public void onError(int num, String msg) { - log.info("Error #" + num + ": " + msg); - } - - public void onInvite(String chan, IRCUser u, String nickPass) { - log.info(chan + "> " + u.getNick() + " invites " + nickPass); - } - - public void onJoin(String chan, IRCUser u) { - log.info(chan + "> " + u.getNick() + " joins"); - } - - public void onKick(String chan, IRCUser u, String nickPass, String msg) { - log.info(chan + "> " + u.getNick() + " kicks " + nickPass); - } - - public void onMode(IRCUser u, String nickPass, String mode) { - log.info("Mode: " + u.getNick() + " sets modes " + mode + " " + nickPass); - } - - public void onMode(String chan, IRCUser u, IRCModeParser mp) { - log.info(chan + "> " + u.getNick() + " sets mode: " + mp.getLine()); - } - - public void onNick(IRCUser u, String nickNew) { - log.info("Nick: " + u.getNick() + " is now known as " + nickNew); - } - - public void onNotice(String target, IRCUser u, String msg) { - log.info(target + "> " + u.getNick() + " (notice): " + msg); - } - - public void onPart(String chan, IRCUser u, String msg) { - log.info(chan + "> " + u.getNick() + " parts"); - } - - public void onPrivmsg(String chan, IRCUser u, String msg) { - Set<WikipediaFeedListener> listeners = channelListeners.get(chan); - - if (listeners != null) { - WikipediaFeedEvent event = new WikipediaFeedEvent(System.currentTimeMillis(), chan, u.getNick(), msg); - - for (WikipediaFeedListener listener : listeners) { - listener.onEvent(event); - } - } - - log.debug(chan + "> " + u.getNick() + ": " + msg); - } - - public void onQuit(IRCUser u, String msg) { - log.info("Quit: " + u.getNick()); - } - - public void onReply(int num, String value, String msg) { - log.info("Reply #" + num + ": " + value + " " + msg); - } - - public void onTopic(String chan, IRCUser u, String topic) { - log.info(chan + "> " + u.getNick() + " changes topic into: " + topic); - } - - public void onPing(String p) { - } - - public void unknown(String a, String b, String c, String d) { - log.warn("UNKNOWN: " + a + " " + b + " " + c + " " + d); - } - } - - public static interface WikipediaFeedListener { - void onEvent(WikipediaFeedEvent event); - } - - public static final class WikipediaFeedEvent { - private final long time; - private final String channel; - private final String source; - private final String rawEvent; - - public WikipediaFeedEvent(long time, String channel, String source, String rawEvent) { - this.time = time; - this.channel = channel; - this.source = source; - this.rawEvent = rawEvent; - } - - public WikipediaFeedEvent(Map<String, Object> jsonObject) { - this((Long) jsonObject.get("time"), (String) jsonObject.get("channel"), (String) jsonObject.get("source"), (String) jsonObject.get("raw")); - } - - public long getTime() { - return time; - } - - public String getChannel() { - return channel; - } - - public String getSource() { - return source; - } - - public String getRawEvent() { - return rawEvent; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((channel == null) ? 0 : channel.hashCode()); - result = prime * result + ((rawEvent == null) ? 0 : rawEvent.hashCode()); - result = prime * result + ((source == null) ? 0 : source.hashCode()); - result = prime * result + (int) (time ^ (time >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - WikipediaFeedEvent other = (WikipediaFeedEvent) obj; - if (channel == null) { - if (other.channel != null) - return false; - } else if (!channel.equals(other.channel)) - return false; - if (rawEvent == null) { - if (other.rawEvent != null) - return false; - } else if (!rawEvent.equals(other.rawEvent)) - return false; - if (source == null) { - if (other.source != null) - return false; - } else if (!source.equals(other.source)) - return false; - if (time != other.time) - return false; - return true; - } - - @Override - public String toString() { - return "WikipediaFeedEvent [time=" + time + ", channel=" + channel + ", source=" + source + ", rawEvent=" + rawEvent + "]"; - } - - public String toJson() { - return toJson(this); - } - - public static Map<String, Object> toMap(WikipediaFeedEvent event) { - Map<String, Object> jsonObject = new HashMap<String, Object>(); - - jsonObject.put("time", event.getTime()); - jsonObject.put("channel", event.getChannel()); - jsonObject.put("source", event.getSource()); - jsonObject.put("raw", event.getRawEvent()); - - return jsonObject; - } - - public static String toJson(WikipediaFeedEvent event) { - Map<String, Object> jsonObject = toMap(event); - - try { - return jsonMapper.writeValueAsString(jsonObject); - } catch (Exception e) { - throw new SamzaException(e); - } - } - - @SuppressWarnings("unchecked") - public static WikipediaFeedEvent fromJson(String json) { - try { - return new WikipediaFeedEvent((Map<String, Object>) jsonMapper.readValue(json, Map.class)); - } catch (Exception e) { - throw new SamzaException(e); - } - } - } - - public static void main(String[] args) throws InterruptedException { - WikipediaFeed feed = new WikipediaFeed("irc.wikimedia.org", 6667); - feed.start(); - - feed.listen("#en.wikipedia", new WikipediaFeedListener() { - @Override - public void onEvent(WikipediaFeedEvent event) { - System.out.println(event); - } - }); - - Thread.sleep(20000); - feed.stop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java deleted file mode 100644 index d1612c9..0000000 --- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.wikipedia.system; - -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemFactory; -import org.apache.samza.system.SystemProducer; -import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; - -public class WikipediaSystemFactory implements SystemFactory { - @Override - public SystemAdmin getAdmin(String systemName, Config config) { - return new SinglePartitionWithoutOffsetsSystemAdmin(); - } - - @Override - public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { - String host = config.get("systems." + systemName + ".host"); - int port = config.getInt("systems." + systemName + ".port"); - WikipediaFeed feed = new WikipediaFeed(host, port); - - return new WikipediaConsumer(systemName, feed, registry); - } - - @Override - public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { - throw new SamzaException("You can't produce to a Wikipedia feed! How about making some edits to a Wiki, instead?"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java deleted file mode 100644 index 07cd8ac..0000000 --- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.wikipedia.task; - -import java.util.Map; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskCoordinator; -import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; - -/** - * This task is very simple. All it does is take messages that it receives, and - * sends them to a Kafka topic called wikipedia-raw. - */ -public class WikipediaFeedStreamTask implements StreamTask { - private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-raw"); - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { - Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envelope.getMessage()); - collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java deleted file mode 100644 index 0505f58..0000000 --- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.wikipedia.task; - -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskCoordinator; -import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; - -public class WikipediaParserStreamTask implements StreamTask { - @SuppressWarnings("unchecked") - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { - Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage(); - WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); - - try { - Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); - - parsedJsonObject.put("channel", event.getChannel()); - parsedJsonObject.put("source", event.getSource()); - parsedJsonObject.put("time", event.getTime()); - - collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); - } catch (Exception e) { - System.err.println("Unable to parse line: " + event); - } - } - - public static Map<String, Object> parse(String line) { - Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)"); - Matcher m = p.matcher(line); - - if (m.find() && m.groupCount() == 6) { - String title = m.group(1); - String flags = m.group(2); - String diffUrl = m.group(3); - String user = m.group(4); - int byteDiff = Integer.parseInt(m.group(5)); - String summary = m.group(6); - - Map<String, Boolean> flagMap = new HashMap<String, Boolean>(); - - flagMap.put("is-minor", flags.contains("M")); - flagMap.put("is-new", flags.contains("N")); - flagMap.put("is-unpatrolled", flags.contains("!")); - flagMap.put("is-bot-edit", flags.contains("B")); - flagMap.put("is-special", title.startsWith("Special:")); - flagMap.put("is-talk", title.startsWith("Talk:")); - - Map<String, Object> root = new HashMap<String, Object>(); - - root.put("title", title); - root.put("user", user); - root.put("unparsed-flags", flags); - root.put("diff-bytes", byteDiff); - root.put("diff-url", diffUrl); - root.put("summary", summary); - root.put("flags", flagMap); - - return root; - } else { - throw new IllegalArgumentException(); - } - } - - public static void main(String[] args) { - String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" }; - - for (String line : lines) { - System.out.println(parse(line)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java deleted file mode 100644 index 60fd93d..0000000 --- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.wikipedia.task; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.samza.config.Config; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.WindowableTask; - -public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask { - private int edits = 0; - private int byteDiff = 0; - private Set<String> titles = new HashSet<String>(); - private Map<String, Integer> counts = new HashMap<String, Integer>(); - private KeyValueStore<String, Integer> store; - - public void init(Config config, TaskContext context) { - this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats"); - } - - @SuppressWarnings("unchecked") - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { - Map<String, Object> edit = (Map<String, Object>) envelope.getMessage(); - Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags"); - - Integer editsAllTime = store.get("count-edits-all-time"); - if (editsAllTime == null) editsAllTime = 0; - store.put("count-edits-all-time", editsAllTime + 1); - - edits += 1; - titles.add((String) edit.get("title")); - byteDiff += (Integer) edit.get("diff-bytes"); - - for (Map.Entry<String, Boolean> flag : flags.entrySet()) { - if (Boolean.TRUE.equals(flag.getValue())) { - Integer count = counts.get(flag.getKey()); - - if (count == null) { - count = 0; - } - - count += 1; - counts.put(flag.getKey(), count); - } - } - } - - @Override - public void window(MessageCollector collector, TaskCoordinator coordinator) { - counts.put("edits", edits); - counts.put("bytes-added", byteDiff); - counts.put("unique-titles", titles.size()); - counts.put("edits-all-time", store.get("count-edits-all-time")); - - collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts)); - - // Reset counts after windowing. - edits = 0; - byteDiff = 0; - titles = new HashSet<String>(); - counts = new HashMap<String, Integer>(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml new file mode 100644 index 0000000..8a8556d --- /dev/null +++ b/src/main/assembly/src.xml @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>dist</id> + <formats> + <format>tar.gz</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}</directory> + <includes> + <include>README*</include> + <include>LICENSE*</include> + <include>NOTICE*</include> + </includes> + </fileSet> + </fileSets> + <files> + <file> + <source>${basedir}/src/main/resources/log4j.xml</source> + <outputDirectory>lib</outputDirectory> + </file> + <!-- filtered=true, so we do variable expansion so the yarn package path + always points to the correct spot on any machine --> + <file> + <source>${basedir}/src/main/config/wikipedia-feed.properties</source> + <outputDirectory>config</outputDirectory> + <filtered>true</filtered> + </file> + <file> + <source>${basedir}/src/main/config/wikipedia-parser.properties</source> + <outputDirectory>config</outputDirectory> + <filtered>true</filtered> + </file> + <file> + <source>${basedir}/src/main/config/wikipedia-stats.properties</source> + <outputDirectory>config</outputDirectory> + <filtered>true</filtered> + </file> + </files> + <dependencySets> + <dependencySet> + <outputDirectory>bin</outputDirectory> + <includes> + <include>org.apache.samza:samza-shell:tgz:dist:*</include> + </includes> + <fileMode>0744</fileMode> + <unpack>true</unpack> + </dependencySet> + <dependencySet> + <outputDirectory>lib</outputDirectory> + <includes> + <include>org.apache.samza:samza-core_2.10</include> + <include>org.apache.samza:samza-kafka_2.10</include> + <include>org.apache.samza:samza-serializers_2.10</include> + <include>org.apache.samza:samza-yarn_2.10</include> + <include>org.apache.samza:samza-kv-rocksdb_2.10</include> + <include>org.apache.samza:samza-log4j</include> + <include>org.apache.samza:hello-samza</include> + <include>org.slf4j:slf4j-log4j12</include> + <include>org.apache.kafka:kafka_2.10</include> + <include>org.apache.hadoop:hadoop-hdfs</include> + </includes> + <useTransitiveFiltering>true</useTransitiveFiltering> + </dependencySet> + </dependencySets> +</assembly> http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-feed.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-feed.properties b/src/main/config/wikipedia-feed.properties new file mode 100644 index 0000000..c498c16 --- /dev/null +++ b/src/main/config/wikipedia-feed.properties @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=wikipedia-feed + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask +task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory + +# Wikipedia System +systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory +systems.wikipedia.host=irc.wikimedia.org +systems.wikipedia.port=6667 + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=json +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.metadata.broker.list=localhost:9092 +systems.kafka.producer.producer.type=sync +# Normally, we'd set this much higher, but we want things to look snappy in the demo. +systems.kafka.producer.batch.num.messages=1 http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-parser.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties new file mode 100644 index 0000000..38575b6 --- /dev/null +++ b/src/main/config/wikipedia-parser.properties @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=wikipedia-parser + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask +task.inputs=kafka.wikipedia-raw +task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory +task.checkpoint.system=kafka +# Normally, this would be 3, but we have only one broker. +task.checkpoint.replication.factor=1 + +# Metrics +metrics.reporters=snapshot,jmx +metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory +metrics.reporter.snapshot.stream=kafka.metrics +metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory + +# Systems +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=json +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.consumer.auto.offset.reset=largest +systems.kafka.producer.metadata.broker.list=localhost:9092 +systems.kafka.producer.producer.type=sync +# Normally, we'd set this much higher, but we want things to look snappy in the demo. +systems.kafka.producer.batch.num.messages=1 +systems.kafka.streams.metrics.samza.msg.serde=metrics http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-stats.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties new file mode 100644 index 0000000..69eff90 --- /dev/null +++ b/src/main/config/wikipedia-stats.properties @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=wikipedia-stats + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask +task.inputs=kafka.wikipedia-edits +task.window.ms=10000 +task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory +task.checkpoint.system=kafka +# Normally, this would be 3, but we have only one broker. +task.checkpoint.replication.factor=1 + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory +serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory + +# Systems +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=json +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.consumer.auto.offset.reset=largest +systems.kafka.producer.metadata.broker.list=localhost:9092 +systems.kafka.producer.producer.type=sync +# Normally, we'd set this much higher, but we want things to look snappy in the demo. +systems.kafka.producer.batch.num.messages=1 + +# Key-value storage +stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog +stores.wikipedia-stats.key.serde=string +stores.wikipedia-stats.msg.serde=integer + +# Normally, we'd set this much higher, but we want things to look snappy in the demo. +stores.wikipedia-stats.write.batch.size=0 +stores.wikipedia-stats.object.cache.size=0 http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java b/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java new file mode 100644 index 0000000..f156c3b --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.system; + +import java.util.ArrayList; +import java.util.List; +import org.apache.samza.Partition; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlockingEnvelopeMap; +import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; +import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedListener; + +public class WikipediaConsumer extends BlockingEnvelopeMap implements WikipediaFeedListener { + private final List<String> channels; + private final String systemName; + private final WikipediaFeed feed; + + public WikipediaConsumer(String systemName, WikipediaFeed feed, MetricsRegistry registry) { + this.channels = new ArrayList<String>(); + this.systemName = systemName; + this.feed = feed; + } + + public void onEvent(final WikipediaFeedEvent event) { + SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, event.getChannel(), new Partition(0)); + + try { + put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, event)); + } catch (Exception e) { + System.err.println(e); + } + } + + @Override + public void register(SystemStreamPartition systemStreamPartition, String startingOffset) { + super.register(systemStreamPartition, startingOffset); + + channels.add(systemStreamPartition.getStream()); + } + + @Override + public void start() { + feed.start(); + + for (String channel : channels) { + feed.listen(channel, this); + } + } + + @Override + public void stop() { + for (String channel : channels) { + feed.unlisten(channel, this); + } + + feed.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java b/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java new file mode 100644 index 0000000..16e302e --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.system; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.samza.SamzaException; +import org.codehaus.jackson.map.ObjectMapper; +import org.schwering.irc.lib.IRCConnection; +import org.schwering.irc.lib.IRCEventListener; +import org.schwering.irc.lib.IRCModeParser; +import org.schwering.irc.lib.IRCUser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WikipediaFeed { + private static final Logger log = LoggerFactory.getLogger(WikipediaFeed.class); + private static final Random random = new Random(); + private static final ObjectMapper jsonMapper = new ObjectMapper(); + + private final Map<String, Set<WikipediaFeedListener>> channelListeners; + private final String host; + private final int port; + private final IRCConnection conn; + private final String nick; + + public WikipediaFeed(String host, int port) { + this.channelListeners = new HashMap<String, Set<WikipediaFeedListener>>(); + this.host = host; + this.port = port; + this.nick = "samza-bot-" + Math.abs(random.nextInt()); + this.conn = new IRCConnection(host, new int[] { port }, "", nick, nick, nick); + this.conn.addIRCEventListener(new WikipediaFeedIrcListener()); + this.conn.setEncoding("UTF-8"); + this.conn.setPong(true); + this.conn.setColors(false); + } + + public void start() { + try { + this.conn.connect(); + } catch (IOException e) { + throw new RuntimeException("Unable to connect to " + host + ":" + port + ".", e); + } + } + + public void stop() { + this.conn.interrupt(); + + try { + this.conn.join(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while trying to shutdown IRC connection for " + host + ":" + port, e); + } + + if (this.conn.isAlive()) { + throw new RuntimeException("Unable to shutdown IRC connection for " + host + ":" + port); + } + } + + public void listen(String channel, WikipediaFeedListener listener) { + Set<WikipediaFeedListener> listeners = channelListeners.get(channel); + + if (listeners == null) { + listeners = new HashSet<WikipediaFeedListener>(); + channelListeners.put(channel, listeners); + join(channel); + } + + listeners.add(listener); + } + + public void unlisten(String channel, WikipediaFeedListener listener) { + Set<WikipediaFeedListener> listeners = channelListeners.get(channel); + + if (listeners == null) { + throw new RuntimeException("Trying to unlisten to a channel that has no listeners in it."); + } else if (!listeners.contains(listener)) { + throw new RuntimeException("Trying to unlisten to a channel that listener is not listening to."); + } + + listeners.remove(listener); + + if (listeners.size() == 0) { + leave(channel); + } + } + + public void join(String channel) { + conn.send("JOIN " + channel); + } + + public void leave(String channel) { + conn.send("PART " + channel); + } + + public class WikipediaFeedIrcListener implements IRCEventListener { + public void onRegistered() { + log.info("Connected"); + } + + public void onDisconnected() { + log.info("Disconnected"); + } + + public void onError(String msg) { + log.info("Error: " + msg); + } + + public void onError(int num, String msg) { + log.info("Error #" + num + ": " + msg); + } + + public void onInvite(String chan, IRCUser u, String nickPass) { + log.info(chan + "> " + u.getNick() + " invites " + nickPass); + } + + public void onJoin(String chan, IRCUser u) { + log.info(chan + "> " + u.getNick() + " joins"); + } + + public void onKick(String chan, IRCUser u, String nickPass, String msg) { + log.info(chan + "> " + u.getNick() + " kicks " + nickPass); + } + + public void onMode(IRCUser u, String nickPass, String mode) { + log.info("Mode: " + u.getNick() + " sets modes " + mode + " " + nickPass); + } + + public void onMode(String chan, IRCUser u, IRCModeParser mp) { + log.info(chan + "> " + u.getNick() + " sets mode: " + mp.getLine()); + } + + public void onNick(IRCUser u, String nickNew) { + log.info("Nick: " + u.getNick() + " is now known as " + nickNew); + } + + public void onNotice(String target, IRCUser u, String msg) { + log.info(target + "> " + u.getNick() + " (notice): " + msg); + } + + public void onPart(String chan, IRCUser u, String msg) { + log.info(chan + "> " + u.getNick() + " parts"); + } + + public void onPrivmsg(String chan, IRCUser u, String msg) { + Set<WikipediaFeedListener> listeners = channelListeners.get(chan); + + if (listeners != null) { + WikipediaFeedEvent event = new WikipediaFeedEvent(System.currentTimeMillis(), chan, u.getNick(), msg); + + for (WikipediaFeedListener listener : listeners) { + listener.onEvent(event); + } + } + + log.debug(chan + "> " + u.getNick() + ": " + msg); + } + + public void onQuit(IRCUser u, String msg) { + log.info("Quit: " + u.getNick()); + } + + public void onReply(int num, String value, String msg) { + log.info("Reply #" + num + ": " + value + " " + msg); + } + + public void onTopic(String chan, IRCUser u, String topic) { + log.info(chan + "> " + u.getNick() + " changes topic into: " + topic); + } + + public void onPing(String p) { + } + + public void unknown(String a, String b, String c, String d) { + log.warn("UNKNOWN: " + a + " " + b + " " + c + " " + d); + } + } + + public static interface WikipediaFeedListener { + void onEvent(WikipediaFeedEvent event); + } + + public static final class WikipediaFeedEvent { + private final long time; + private final String channel; + private final String source; + private final String rawEvent; + + public WikipediaFeedEvent(long time, String channel, String source, String rawEvent) { + this.time = time; + this.channel = channel; + this.source = source; + this.rawEvent = rawEvent; + } + + public WikipediaFeedEvent(Map<String, Object> jsonObject) { + this((Long) jsonObject.get("time"), (String) jsonObject.get("channel"), (String) jsonObject.get("source"), (String) jsonObject.get("raw")); + } + + public long getTime() { + return time; + } + + public String getChannel() { + return channel; + } + + public String getSource() { + return source; + } + + public String getRawEvent() { + return rawEvent; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((channel == null) ? 0 : channel.hashCode()); + result = prime * result + ((rawEvent == null) ? 0 : rawEvent.hashCode()); + result = prime * result + ((source == null) ? 0 : source.hashCode()); + result = prime * result + (int) (time ^ (time >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + WikipediaFeedEvent other = (WikipediaFeedEvent) obj; + if (channel == null) { + if (other.channel != null) + return false; + } else if (!channel.equals(other.channel)) + return false; + if (rawEvent == null) { + if (other.rawEvent != null) + return false; + } else if (!rawEvent.equals(other.rawEvent)) + return false; + if (source == null) { + if (other.source != null) + return false; + } else if (!source.equals(other.source)) + return false; + if (time != other.time) + return false; + return true; + } + + @Override + public String toString() { + return "WikipediaFeedEvent [time=" + time + ", channel=" + channel + ", source=" + source + ", rawEvent=" + rawEvent + "]"; + } + + public String toJson() { + return toJson(this); + } + + public static Map<String, Object> toMap(WikipediaFeedEvent event) { + Map<String, Object> jsonObject = new HashMap<String, Object>(); + + jsonObject.put("time", event.getTime()); + jsonObject.put("channel", event.getChannel()); + jsonObject.put("source", event.getSource()); + jsonObject.put("raw", event.getRawEvent()); + + return jsonObject; + } + + public static String toJson(WikipediaFeedEvent event) { + Map<String, Object> jsonObject = toMap(event); + + try { + return jsonMapper.writeValueAsString(jsonObject); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @SuppressWarnings("unchecked") + public static WikipediaFeedEvent fromJson(String json) { + try { + return new WikipediaFeedEvent((Map<String, Object>) jsonMapper.readValue(json, Map.class)); + } catch (Exception e) { + throw new SamzaException(e); + } + } + } + + public static void main(String[] args) throws InterruptedException { + WikipediaFeed feed = new WikipediaFeed("irc.wikimedia.org", 6667); + feed.start(); + + feed.listen("#en.wikipedia", new WikipediaFeedListener() { + @Override + public void onEvent(WikipediaFeedEvent event) { + System.out.println(event); + } + }); + + Thread.sleep(20000); + feed.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java new file mode 100644 index 0000000..d1612c9 --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.system; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; + +public class WikipediaSystemFactory implements SystemFactory { + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return new SinglePartitionWithoutOffsetsSystemAdmin(); + } + + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + String host = config.get("systems." + systemName + ".host"); + int port = config.getInt("systems." + systemName + ".port"); + WikipediaFeed feed = new WikipediaFeed(host, port); + + return new WikipediaConsumer(systemName, feed, registry); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + throw new SamzaException("You can't produce to a Wikipedia feed! How about making some edits to a Wiki, instead?"); + } +}
