Hello,

I have tried running the Hello-Samza project which worked successfully for me. 
Now I am targeting a Samza application that can consume messages from a Kafka 
console producer and process it to emit the results back to a Kafka console 
consumer.

I followed the thread "Writing a simple KafkaProducer in Samza", from where I 
could get an example of the Kafka consumer in "WikipediaParserStreamTask.java".

On similar lines, I have been trying to create a Samza program in Java called 
MyTask.java which is attached herewith. I have created the class in the package 
"samza.examples.wikipedia.task" of the samza-wikipedia project. And I follow 
the same instructions to build the project as those for Hello-Samza.

The config/properties file I use is also attached. I need to add 
"systems.kafka.samza.msg.serde=string" since the messages I emit from the kafka 
producer console are strings, as opposed to 
"systems.kafka.samza.msg.serde=json" used in the "WikipediaParserStreamTask" 
example.

Even after having tried out many changes to the properties file, the Kafka 
producer and Samza application don't seem to connect to each other. However, I 
can see SamzaAppMaster and SamzaContainer processes running on the machine.

I can get that since the Samza application is not able to receive messages from 
the Kafka producer running locally, its process() method is not invoked.

Can you help me in making this work please?

Regards,
Jayati

________________________________






NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package samza.examples.wikipedia.task;

import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;

import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;

public class MyTask implements StreamTask {

	static FileWriter file;
	static PrintWriter outputFile;

	@SuppressWarnings("unchecked")

	public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
		String message = "message arrived ..... ";
		try {
			file = new FileWriter("/home/test/SAMZA/output" + (int)(Math.random() * 100));
			outputFile = new PrintWriter(file);
			outputFile.append("hello 1 ");
			outputFile.flush();
			String output = (String) envelope.getMessage();
			outputFile.append("hello  2 ");
			outputFile.flush();
			outputFile.append(output);
			outputFile.flush();
			outputFile.close();
			file.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "testProducer"), message));

	}

	public static void main(String[] args) {

	}
}

Reply via email to