> On Wed, 23.03.2016 06:59, Chesnay Schepler wrote > Could you be missing the call to execute()?
Yes, that was it. Can't believe I missed that ! Thank you Chesnay. Best, Tarandeep On 23.03.2016 01:25, Tarandeep Singh wrote: >> Hi, >> >> I wrote a simple Flink job that uses Avro input format to read avro >> file and save the results in avro format. The job does not get >> submitted and job client exist out immediately. Same thing happens if >> I run the program in IDE or if I submit via command line. >> >> Here is the program- >> >> import com.styleseat.flinkpractice.avro.PageTrackingRecord; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.AvroInputFormat; import org.apache.flink.api.java.io.AvroOutputFormat; import org.apache.flink.core.fs.Path; public class GrepAvro { >> >> public static void main(String[] args) { >> final String keyword = args[0]; final Path inputPath =new Path(args[1]); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final AvroInputFormat<PageTrackingRecord> inputFormat =new AvroInputFormat<PageTrackingRecord>(inputPath, PageTrackingRecord.class); DataSet<PageTrackingRecord> dataSet = env.createInput(inputFormat); dataSet.filter(new FilterFunction<PageTrackingRecord>() { >> @Override public boolean filter(PageTrackingRecord pageTrackingRecord)throws Exception { >> String userAgent = pageTrackingRecord.getUserAgent().toString(); return (userAgent !=null && userAgent.contains(keyword)); } >> }).write(new AvroOutputFormat<PageTrackingRecord>(PageTrackingRecord.class), args[2]); } >> } >> >> The avro files are stored in HDFS and I used the hdfs paths >> (hdfs:///user/flink/data/...) >> There isn't any error in log file, however when I ran the job via web >> interface, I get this error- >> >> org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely. >> >> System.err: (none) >> >> System.out: (none) >> at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:102) >> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215) >> at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95) >> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50) >> at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135) >> at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112) >> at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60) >> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >> at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62) >> at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57) >> at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20) >> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >> at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158) >> at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65) >> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >> at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) >> at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147) >> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >> at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) >> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) >> at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) >> at java.lang.Thread.run(Thread.java:745) >> >> This is my pom.xml file - >> >> <?xml version="1.0" encoding="UTF-8"?> <project >> xmlns="http://maven.apache.org/POM/4.0.0" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xyz <http://com.xyz></groupId> <artifactId>flink-practice</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.0.0</flink.version> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> >> <url>https://repository.apache.org/content/repositories/snapshots/</url> >> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <build> >> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.0</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <fieldVisibility>PRIVATE</fieldVisibility> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java</outputDirectory> </configuration> </execution> </executions> >> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> >> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.0</version> </dependency> </dependencies> </project> >> >> Any idea what I might be doing wrong? I was able to run flink jobs on >> text data. So flink is working. >> >> Thanks, >> Tarandeep