Updated with changes in apache/samza master for Samza 1.0 Author: Prateek Maheshwari <pmaheshw...@apache.org>
Closes #36 from prateekm/latest Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/2d956496 Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/2d956496 Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/2d956496 Branch: refs/heads/master Commit: 2d956496fa68c514d087399167d0f07672bcac45 Parents: 195e181 Author: Prateek Maheshwari <pmaheshw...@apache.org> Authored: Sat Oct 13 15:43:27 2018 -0700 Committer: Prateek Maheshwari <pmaheshw...@apache.org> Committed: Sat Oct 13 15:43:27 2018 -0700 ---------------------------------------------------------------------- bin/deploy.sh | 2 +- gradle.properties | 2 +- pom.xml | 4 +- .../samza/examples/azure/AzureApplication.java | 19 ++----- .../samza/examples/cookbook/FilterExample.java | 8 +-- .../samza/examples/cookbook/JoinExample.java | 8 +-- .../cookbook/RemoteTableJoinExample.java | 15 +++--- .../examples/cookbook/SessionWindowExample.java | 8 +-- .../cookbook/StreamTableJoinExample.java | 10 ++-- .../cookbook/TumblingWindowExample.java | 8 +-- .../application/WikipediaApplication.java | 24 ++++----- .../system/WikipediaInputDescriptor.java | 41 --------------- .../system/WikipediaSystemDescriptor.java | 51 ------------------- .../descriptors/WikipediaInputDescriptor.java | 42 ++++++++++++++++ .../descriptors/WikipediaSystemDescriptor.java | 53 ++++++++++++++++++++ .../task/WikipediaStatsStreamTask.java | 11 ++-- 16 files changed, 149 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/bin/deploy.sh ---------------------------------------------------------------------- diff --git a/bin/deploy.sh b/bin/deploy.sh index 5b50079..3c3ada2 100755 --- a/bin/deploy.sh +++ b/bin/deploy.sh @@ -23,4 +23,4 @@ base_dir=`pwd` mvn clean package mkdir -p $base_dir/deploy/samza -tar -xvf $base_dir/target/hello-samza-0.15.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza +tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/gradle.properties ---------------------------------------------------------------------- diff --git a/gradle.properties b/gradle.properties index 37f8eb7..34a540a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,7 @@ * under the License. */ -SAMZA_VERSION=0.15.0-SNAPSHOT +SAMZA_VERSION=1.0.0-SNAPSHOT KAFKA_VERSION=0.11.0.2 HADOOP_VERSION=2.6.1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a41be49..8659683 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ under the License. <groupId>org.apache.samza</groupId> <artifactId>hello-samza</artifactId> - <version>0.15.0-SNAPSHOT</version> + <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <name>Samza Example</name> <description> @@ -153,7 +153,7 @@ under the License. <properties> <!-- maven specific properties --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <samza.version>0.15.0-SNAPSHOT</samza.version> + <samza.version>1.0.0-SNAPSHOT</samza.version> <hadoop.version>2.6.1</hadoop.version> </properties> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/azure/AzureApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java index 12d293b..e2c337f 100644 --- a/src/main/java/samza/examples/azure/AzureApplication.java +++ b/src/main/java/samza/examples/azure/AzureApplication.java @@ -19,31 +19,24 @@ package samza.examples.azure; -import java.util.HashMap; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.descriptors.GenericInputDescriptor; -import org.apache.samza.operators.descriptors.GenericOutputDescriptor; -import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.serializers.ByteSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.descriptors.GenericInputDescriptor; +import org.apache.samza.system.descriptors.GenericOutputDescriptor; +import org.apache.samza.system.descriptors.GenericSystemDescriptor; public class AzureApplication implements StreamApplication { - - // Inputs private static final String INPUT_STREAM_ID = "input-stream"; - - // Outputs private static final String OUTPUT_STREAM_ID = "output-stream"; @Override public void describe(StreamApplicationDescriptor appDescriptor) { - HashMap<String, String> systemConfigs = new HashMap<>(); - GenericSystemDescriptor systemDescriptor = new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory"); @@ -55,13 +48,9 @@ public class AzureApplication implements StreamApplication { GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor = systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde); - - // Input MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor); - // Output OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor); - // Send eventhubInput .filter((message) -> message.getKey() != null) .map((message) -> { http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/FilterExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/FilterExample.java b/src/main/java/samza/examples/cookbook/FilterExample.java index bcf5b18..bd14300 100644 --- a/src/main/java/samza/examples/cookbook/FilterExample.java +++ b/src/main/java/samza/examples/cookbook/FilterExample.java @@ -19,16 +19,16 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/JoinExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/JoinExample.java b/src/main/java/samza/examples/cookbook/JoinExample.java index 05a358d..14753eb 100644 --- a/src/main/java/samza/examples/cookbook/JoinExample.java +++ b/src/main/java/samza/examples/cookbook/JoinExample.java @@ -20,7 +20,7 @@ package samza.examples.cookbook; import java.io.Serializable; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; @@ -28,9 +28,9 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java index 386cdda..1c27cda 100644 --- a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java +++ b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java @@ -29,20 +29,20 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; import org.apache.samza.table.Table; -import org.apache.samza.table.caching.CachingTableDescriptor; -import org.apache.samza.table.remote.RemoteTableDescriptor; +import org.apache.samza.table.caching.descriptors.CachingTableDescriptor; import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.table.remote.descriptors.RemoteTableDescriptor; import org.apache.samza.util.ExponentialSleepStrategy; import org.apache.samza.util.HttpUtil; import org.codehaus.jackson.JsonFactory; @@ -131,8 +131,7 @@ public class RemoteTableJoinExample implements StreamApplication { .withReadRateLimit(10) .withReadFunction(new StockPriceReadFunction()); CachingTableDescriptor<String, Double> cachedRemoteTableDescriptor = - new CachingTableDescriptor<String, Double>("cached-remote-table") - .withTable(remoteTableDescriptor) + new CachingTableDescriptor<>("cached-remote-table", remoteTableDescriptor) .withReadTtl(Duration.ofSeconds(5)); Table<KV<String, Double>> cachedRemoteTable = appDescriptor.getTable(cachedRemoteTableDescriptor); http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/SessionWindowExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/SessionWindowExample.java b/src/main/java/samza/examples/cookbook/SessionWindowExample.java index bfdf188..1db0808 100644 --- a/src/main/java/samza/examples/cookbook/SessionWindowExample.java +++ b/src/main/java/samza/examples/cookbook/SessionWindowExample.java @@ -20,7 +20,7 @@ package samza.examples.cookbook; import java.io.Serializable; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; @@ -29,9 +29,9 @@ import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java b/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java index 4d5c57e..d9f6acf 100644 --- a/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java +++ b/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java @@ -19,7 +19,7 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; @@ -28,10 +28,10 @@ import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.storage.kv.RocksDbTableDescriptor; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; import org.apache.samza.table.Table; import com.google.common.collect.ImmutableList; http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/TumblingWindowExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/TumblingWindowExample.java b/src/main/java/samza/examples/cookbook/TumblingWindowExample.java index 51c2056..5ec6876 100644 --- a/src/main/java/samza/examples/cookbook/TumblingWindowExample.java +++ b/src/main/java/samza/examples/cookbook/TumblingWindowExample.java @@ -20,7 +20,7 @@ package samza.examples.cookbook; import java.io.Serializable; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; @@ -29,9 +29,9 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java index 734df96..60bbe15 100644 --- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java @@ -22,29 +22,28 @@ package samza.examples.wikipedia.application; import com.google.common.collect.ImmutableList; import java.io.Serializable; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.config.Config; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; +import org.apache.samza.context.Context; +import org.apache.samza.context.TaskContext; import org.apache.samza.metrics.Counter; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.functions.FoldLeftFunction; -import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.Serde; import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; -import org.apache.samza.task.TaskContext; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; import samza.examples.wikipedia.model.WikipediaParser; import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; -import samza.examples.wikipedia.system.WikipediaInputDescriptor; -import samza.examples.wikipedia.system.WikipediaSystemDescriptor; +import samza.examples.wikipedia.system.descriptors.WikipediaInputDescriptor; +import samza.examples.wikipedia.system.descriptors.WikipediaSystemDescriptor; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -141,13 +140,14 @@ public class WikipediaApplication implements StreamApplication, Serializable { /** * {@inheritDoc} - * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Config, TaskContext)} to + * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Context)} to * get a KeyValueStore for persistence and the MetricsRegistry for metrics. */ @Override - public void init(Config config, TaskContext context) { - store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats"); - repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits"); + public void init(Context context) { + TaskContext taskContext = context.getTaskContext(); + store = (KeyValueStore<String, Integer>) taskContext.getStore("wikipedia-stats"); + repeatEdits = taskContext.getTaskMetricsRegistry().newCounter("edit-counters", "repeat-edits"); } @Override http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java b/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java deleted file mode 100644 index 92de60d..0000000 --- a/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.wikipedia.system; - -import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; -import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.serializers.Serde; -import samza.examples.wikipedia.application.WikipediaApplication; - - -public class WikipediaInputDescriptor extends InputDescriptor<WikipediaFeed.WikipediaFeedEvent, WikipediaInputDescriptor> { - // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized. - private static final Serde SERDE = new NoOpSerde(); - - WikipediaInputDescriptor(String streamId, SystemDescriptor systemDescriptor) { - super(streamId, SERDE, systemDescriptor, null); - } - - public WikipediaInputDescriptor withChannel(String channel) { - withPhysicalName(channel); - return this; - } -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java deleted file mode 100644 index 6f50196..0000000 --- a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.wikipedia.system; - -import java.util.Map; -import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; - -public class WikipediaSystemDescriptor extends SystemDescriptor<WikipediaSystemDescriptor> { - private static final String SYSTEM_NAME = "wikipedia"; - private static final String FACTORY_CLASS_NAME = WikipediaSystemFactory.class.getName(); - private static final String HOST_KEY = "systems.%s.host"; - private static final String PORT_KEY = "systems.%s.port"; - - private final String host; - private final int port; - - public WikipediaSystemDescriptor(String host, int port) { - super(SYSTEM_NAME, FACTORY_CLASS_NAME, null, null); - this.host = host; - this.port = port; - } - - public WikipediaInputDescriptor getInputDescriptor(String streamId) { - return new WikipediaInputDescriptor(streamId, this); - } - - @Override - public Map<String, String> toConfig() { - Map<String, String> configs = super.toConfig(); - configs.put(String.format(HOST_KEY, getSystemName()), host); - configs.put(String.format(PORT_KEY, getSystemName()), Integer.toString(port)); - return configs; - } -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java new file mode 100644 index 0000000..29c7b92 --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.system.descriptors; + +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.descriptors.InputDescriptor; +import org.apache.samza.system.descriptors.SystemDescriptor; + +import samza.examples.wikipedia.system.WikipediaFeed; + + +public class WikipediaInputDescriptor extends InputDescriptor<WikipediaFeed.WikipediaFeedEvent, WikipediaInputDescriptor> { + // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized. + private static final Serde SERDE = new NoOpSerde(); + + WikipediaInputDescriptor(String streamId, SystemDescriptor systemDescriptor) { + super(streamId, SERDE, systemDescriptor, null); + } + + public WikipediaInputDescriptor withChannel(String channel) { + withPhysicalName(channel); + return this; + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java new file mode 100644 index 0000000..9f516fd --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.system.descriptors; + +import samza.examples.wikipedia.system.WikipediaSystemFactory; + +import java.util.Map; +import org.apache.samza.system.descriptors.SystemDescriptor; + +public class WikipediaSystemDescriptor extends SystemDescriptor<WikipediaSystemDescriptor> { + private static final String SYSTEM_NAME = "wikipedia"; + private static final String FACTORY_CLASS_NAME = WikipediaSystemFactory.class.getName(); + private static final String HOST_KEY = "systems.%s.host"; + private static final String PORT_KEY = "systems.%s.port"; + + private final String host; + private final int port; + + public WikipediaSystemDescriptor(String host, int port) { + super(SYSTEM_NAME, FACTORY_CLASS_NAME, null, null); + this.host = host; + this.port = port; + } + + public WikipediaInputDescriptor getInputDescriptor(String streamId) { + return new WikipediaInputDescriptor(streamId, this); + } + + @Override + public Map<String, String> toConfig() { + Map<String, String> configs = super.toConfig(); + configs.put(String.format(HOST_KEY, getSystemName()), host); + configs.put(String.format(PORT_KEY, getSystemName()), Integer.toString(port)); + return configs; + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java index abe760a..897f9f1 100644 --- a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java +++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java @@ -23,7 +23,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.samza.config.Config; +import org.apache.samza.context.Context; +import org.apache.samza.context.TaskContext; import org.apache.samza.metrics.Counter; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; @@ -32,7 +33,6 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.WindowableTask; @@ -48,9 +48,10 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo // Example metric. Running counter of the number of repeat edits of the same title within a single window. private Counter repeatEdits; - public void init(Config config, TaskContext context) { - this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats"); - this.repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits"); + public void init(Context context) { + TaskContext taskContext = context.getTaskContext(); + this.store = (KeyValueStore<String, Integer>) taskContext.getStore("wikipedia-stats"); + this.repeatEdits = taskContext.getTaskMetricsRegistry().newCounter("edit-counters", "repeat-edits"); } @SuppressWarnings("unchecked")