Thanks a log for the response Chaitanya!!
Sharing more details for your reference and suggestions !!
Appliation.java:
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator;
import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;
import java.util.Properties;
public class Application implements StreamingApplication {
public void populateDAG(DAG dag, Configuration conf)
{
Properties props = new Properties();
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location","client.truststore.jks");
props.put("ssl.truststore.password","******");
props.put("ssl.keystore.location","server.keystore.jks");
props.put("ssl.keystore.password","******");
props.put("schema.registry.url", "http://********:8081");
props.put("enable.auto.commit", "false");
KafkaSinglePortInputOperator in = dag.addOperator("in", new
KafkaSinglePortInputOperator());
in.setInitialPartitionCount(1);
in.setTopics("***********");
in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
in.setClusters("********:9093");
LineOutputOperator out = dag.addOperator("out", new
LineOutputOperator());
out.setFilePath("hdfs://******/*********");
out.setFileName("test");
out.setMaxLength(1024);
dag.addStream("data", in.outputPort, out.input);
}
}
Also, sharing POM.xml below:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tgt.hdp.mighty</groupId>
<artifactId>test-apex</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<apex.version>3.4.0</apex.version>
<apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
<hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version>
<hbase.version>1.1.2.2.3.4.0-3485</hbase.version>
<kafka.version>0.9.0.1</kafka.version>
<confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version>
<kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version>
<avro.version>1.7.7</avro.version>
<json.version>1.1</json.version>
<jodatime.version>2.9.1</jodatime.version>
<kyroserializer.version>0.38</kyroserializer.version>
<junit.version>4.10</junit.version>
</properties>
<repositories>
<repository>
<id>HDPReleases</id>
<name>HDP Releases</name>
<url>http://repo.hortonworks.com/content/repositories/releases/</url>
<layout>default</layout>
</repository>
<repository>
<id>HDP Jetty Hadoop</id>
<name>HDP Jetty Hadoop</name>
<url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
<layout>default</layout>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-library</artifactId>
<version>${apex.version}</version>
<!--
If you know that your application does not need transitive
dependencies pulled in by malhar-library,
uncomment the following to reduce the size of your app package.
-->
<!--
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
-->
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-common</artifactId>
<version>${apex.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
<version>${apex.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-contrib</artifactId>
<version>${apex.version}</version>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-kafka</artifactId>
<version>${apex.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${confluent.kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka.avro.srlzr.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${jodatime.version}</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>${kyroserializer.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<downloadSources>true</downloadSources>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.7</source>
<target>1.7</target>
<debug>true</debug>
<optimize>false</optimize>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>target/deps</outputDirectory>
<includeScope>runtime</includeScope>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>app-package-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}-apexapp</finalName>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/assemble/appPackage.xml</descriptor>
</descriptors>
<archiverConfig>
<defaultDirectoryMode>0755</defaultDirectoryMode>
</archiverConfig>
<archive>
<manifestEntries>
<Class-Path>${apex.apppackage.classpath}</Class-Path>
<DT-Engine-Version>${apex.version}</DT-Engine-Version>
<DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
<DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
<DT-App-Package-Version>${project.version}</DT-App-Package-Version>
<DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
<DT-App-Package-Description>${project.description}</DT-App-Package-Description>
</manifestEntries>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<phase>package</phase>
<configuration>
<target>
<move
file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa"
/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
<execution>
<!-- create resource directory for xml javadoc-->
<id>createJavadocDirectory</id>
<phase>generate-resources</phase>
<configuration>
<tasks>
<delete
dir="${project.build.directory}/generated-resources/xml-javadoc"/>
<mkdir
dir="${project.build.directory}/generated-resources/xml-javadoc"/>
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.9.1</version>
<executions>
<execution>
<id>attach-artifacts</id>
<phase>package</phase>
<goals>
<goal>attach-artifact</goal>
</goals>
<configuration>
<artifacts>
<artifact>
<file>target/${project.artifactId}-${project.version}.apa</file>
<type>apa</type>
</artifact>
</artifacts>
<skipAttach>false</skipAttach>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Regards,
Raja.
From: Chaitanya Chebolu <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Tuesday, December 6, 2016 at 4:28 PM
To: "[email protected]" <[email protected]>
Subject: [EXTERNAL] Re: KafkaSinglePortInputOperator
Hi Raja,
Could you please share the Application Master logs and Kafka operator
container logs.
Regards,
Chaitanya
On Tue, Dec 6, 2016 at 4:17 PM, Raja.Aravapalli
<[email protected]<mailto:[email protected]>> wrote:
Hi Team,
I am using “KafkaSinglePortInputOperator” to connect to a SSL Secured topic in
Kafka 0.9!!
Unfortunately… my apex application is not going to “RUNNING” state…!! Only
staying in ACCEPTED State and then going into FAILED statie!! I don’t see much
information in the logs…!! ☹
Can someone please help fix the issue…. We have immediate need to read messages
from kafka 0.9 SSL configured topics…
Please advise!
Thanks very much in advance.
Regards,
Raja.