[ https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327122#comment-15327122 ]
Chesnay Schepler commented on FLINK-4061: ----------------------------------------- this could easily be caused by a version diffference between flink-runtime and the jdbc connector. If flink-runtime is on 1.0 (as in, not SNAPSHOT) it will never call openInputFormat(), thus resulting in the NPE. > about flink jdbc connect oracle db exists a crital bug > ------------------------------------------------------- > > Key: FLINK-4061 > URL: https://issues.apache.org/jira/browse/FLINK-4061 > Project: Flink > Issue Type: Bug > Components: Batch > Affects Versions: 1.1.0 > Environment: ubuntu ,jdk1.8.0 ,Start a Local Flink Cluster > Reporter: dengchangfu > Priority: Blocker > Original Estimate: 168h > Remaining Estimate: 168h > > I use flink-jdbc to connect oracle db for etl, so i write a demo to test the > feature. the code is simple,but after I submit this app ,a exception happen. > exception info like this: > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > my code like this: > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; > import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; > import org.apache.flink.api.table.Row; > import org.apache.flink.api.table.typeutils.RowTypeInfo; > import > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; > import java.sql.ResultSet; > import java.sql.Types; > /** > * Skeleton for a Flink Job. > * > * For a full example of a Flink Job, see the WordCountJob.java file in the > * same package/directory or have a look at the website. > * > * You can also generate a .jar file that you can submit on your Flink > * cluster. > * Just type > * mvn clean package > * in the projects root directory. > * You will find the jar in > * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar > * > */ > public class Job { > public static final TypeInformation<?>[] fieldTypes = new > TypeInformation<?>[]{ > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.FLOAT_TYPE_INFO > }; > public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); > public static void main(String[] args) { > // set up the execution environment > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > JDBCInputFormatBuilder inputBuilder = > JDBCInputFormat.buildJDBCInputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("select CLIENT_ID,OCCUR_BALANCE from > HS_ASSET.FUNDJOUR@OTC") > .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) > .setRowTypeInfo(rowTypeInfo); > DataSet<Row> source = env.createInput(inputBuilder.finish()); > source.output(JDBCOutputFormat.buildJDBCOutputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("insert into dengabc (client_id,salary) > values(?,?)") > .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE}) > .finish()); > //source.print(); > //source.first(20).print(); > //dbData.print(); > /** > * Here, you can start creating your execution plan for Flink. > * > * Start with getting some data from the environment, like > * env.readTextFile(textPath); > * > * then, transform the resulting DataSet<String> using operations > * like > * .filter() > * .flatMap() > * .join() > * .coGroup() > * and many more. > * Have a look at the programming guide for the Java API: > * > * http://flink.apache.org/docs/latest/apis/batch/index.html > * > * and the examples > * > * http://flink.apache.org/docs/latest/apis/batch/examples.html > * > */ > // execute program > try { > env.execute("Flink Java API Skeleton"); > } catch (Exception e) { > e.getMessage(); > } > } > } > my pom.xml like this: > <!-- > Licensed to the Apache Software Foundation (ASF) under one > or more contributor license agreements. See the NOTICE file > distributed with this work for additional information > regarding copyright ownership. The ASF licenses this file > to you under the Apache License, Version 2.0 (the > "License"); you may not use this file except in compliance > with the License. You may obtain a copy of the License at > http://www.apache.org/licenses/LICENSE-2.0 > Unless required by applicable law or agreed to in writing, > software distributed under the License is distributed on an > "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > KIND, either express or implied. See the License for the > specific language governing permissions and limitations > under the License. > --> > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > > <groupId>com.gf.flink</groupId> > <artifactId>mot</artifactId> > <version>1.0-SNAPSHOT</version> > <packaging>jar</packaging> > <name>Flink Quickstart Job</name> > <url>http://www.myorganization.org</url> > <properties> > > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>1.0.3</flink.version> > </properties> > <repositories> > <repository> > <id>apache.snapshots</id> > <name>Apache Development Snapshot Repository</name> > > <url>https://repository.apache.org/content/repositories/snapshots/</url> > <releases> > <enabled>false</enabled> > </releases> > <snapshots> > <enabled>true</enabled> > </snapshots> > </repository> > <!--<repository>--> > <!--<id>mvnrepository</id>--> > <!--<name>mvnrepository</name>--> > <!--<url>http://mvnrepository.com/</url>--> > <!--<releases>--> > <!--<enabled>true</enabled>--> > <!--</releases>--> > <!--<snapshots>--> > <!--<enabled>true</enabled>--> > <!--</snapshots>--> > <!--</repository>--> > <repository> > <id>nexus</id> > <name>local private nexus</name> > > <url>http://10.2.110.202:8081/nexus/content/groups/public/</url> > <releases><enabled>true</enabled></releases> > <snapshots><enabled>false</enabled></snapshots> > </repository> > <repository> > <id>nexus-snapshots</id> > <name>local private nexus</name> > > <url>http://10.2.110.202:8081/nexus/content/groups/public-snapshots/</url> > <releases><enabled>false</enabled></releases> > <snapshots><enabled>true</enabled></snapshots> > </repository> > </repositories> > > <!-- > > Execute "mvn clean package -Pbuild-jar" > to build a jar file out of this project! > How to use the Flink Quickstart pom: > a) Adding new dependencies: > You can add dependencies to the list below. > Please check if the maven-shade-plugin below is > filtering out your dependency > and remove the exclude from there. > b) Build a jar for running on the cluster: > There are two options for creating a jar from this > project > b.1) "mvn clean package" -> this will create a fat jar > which contains all > dependencies necessary for running the > jar created by this pom in a cluster. > The "maven-shade-plugin" excludes > everything that is provided on a running Flink cluster. > b.2) "mvn clean package -Pbuild-jar" -> This will also > create a fat-jar, but with much > nicer dependency exclusion handling. > This approach is preferred and leads to > much cleaner jar files. > --> > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>1.1-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.10</artifactId> > <version>1.1-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.10</artifactId> > <version>1.1-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table_2.10</artifactId> > <version>1.1-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>1.1-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-jdbc</artifactId> > <version>1.1-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>com.oracle</groupId> > <artifactId>com.oracle.ojdbc</artifactId> > <version>14</version> > </dependency> > </dependencies> > <profiles> > <profile> > <!-- Profile for packaging correct JAR files --> > <id>build-jar</id> > <activation> > <activeByDefault>false</activeByDefault> > </activation> > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-streaming-java_2.10</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-clients_2.10</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > </dependencies> > <build> > <plugins> > <!-- disable the exclusion rules --> > <plugin> > > <groupId>org.apache.maven.plugins</groupId> > > <artifactId>maven-shade-plugin</artifactId> > <version>2.4.1</version> > <executions> > <execution> > > <phase>package</phase> > <goals> > > <goal>shade</goal> > </goals> > <configuration> > > <artifactSet> > > <excludes combine.self="override"></excludes> > > </artifactSet> > </configuration> > </execution> > </executions> > </plugin> > </plugins> > </build> > </profile> > </profiles> > <build> > <plugins> > <!-- We use the maven-shade plugin to create a fat jar > that contains all dependencies > except flink and it's transitive dependencies. The > resulting fat-jar can be executed > on a cluster. Change the value of Program-Class if your > program entry point changes. --> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-shade-plugin</artifactId> > <version>2.4.1</version> > <executions> > <!-- Run shade goal on package phase --> > <execution> > <phase>package</phase> > <goals> > <goal>shade</goal> > </goals> > <configuration> > <artifactSet> > <excludes> > <!-- > This list contains all dependencies of flink-dist > > Everything else will be packaged into the fat-jar > --> > > <exclude>org.apache.flink:flink-annotations</exclude> > > <exclude>org.apache.flink:flink-shaded-hadoop1</exclude> > > <exclude>org.apache.flink:flink-shaded-hadoop2</exclude> > > <exclude>org.apache.flink:flink-shaded-curator-recipes</exclude> > > <exclude>org.apache.flink:flink-core</exclude> > > <exclude>org.apache.flink:flink-java</exclude> > > <exclude>org.apache.flink:flink-scala_2.10</exclude> > > <exclude>org.apache.flink:flink-runtime_2.10</exclude> > > <exclude>org.apache.flink:flink-optimizer_2.10</exclude> > > <exclude>org.apache.flink:flink-clients_2.10</exclude> > > <exclude>org.apache.flink:flink-avro_2.10</exclude> > > <exclude>org.apache.flink:flink-examples-batch_2.10</exclude> > > <exclude>org.apache.flink:flink-examples-streaming_2.10</exclude> > > <exclude>org.apache.flink:flink-streaming-java_2.10</exclude> > <!-- > Also exclude very big transitive dependencies of Flink > > WARNING: You have to remove these excludes if your code relies on other > > versions of these dependencies. > --> > > <exclude>org.scala-lang:scala-library</exclude> > > <exclude>org.scala-lang:scala-compiler</exclude> > > <exclude>org.scala-lang:scala-reflect</exclude> > > <exclude>com.amazonaws:aws-java-sdk</exclude> > > <exclude>com.typesafe.akka:akka-actor_*</exclude> > > <exclude>com.typesafe.akka:akka-remote_*</exclude> > > <exclude>com.typesafe.akka:akka-slf4j_*</exclude> > > <exclude>io.netty:netty-all</exclude> > > <exclude>io.netty:netty</exclude> > > <exclude>commons-fileupload:commons-fileupload</exclude> > > <exclude>org.apache.avro:avro</exclude> > > <exclude>commons-collections:commons-collections</exclude> > > <exclude>org.codehaus.jackson:jackson-core-asl</exclude> > > <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude> > > <exclude>com.thoughtworks.paranamer:paranamer</exclude> > > <exclude>org.xerial.snappy:snappy-java</exclude> > > <exclude>org.apache.commons:commons-compress</exclude> > > <exclude>org.tukaani:xz</exclude> > > <exclude>com.esotericsoftware.kryo:kryo</exclude> > > <exclude>com.esotericsoftware.minlog:minlog</exclude> > > <exclude>org.objenesis:objenesis</exclude> > > <exclude>com.twitter:chill_*</exclude> > > <exclude>com.twitter:chill-java</exclude> > > <exclude>com.twitter:chill-avro_*</exclude> > > <exclude>com.twitter:chill-bijection_*</exclude> > > <exclude>com.twitter:bijection-core_*</exclude> > > <exclude>com.twitter:bijection-avro_*</exclude> > > <exclude>commons-lang:commons-lang</exclude> > > <exclude>junit:junit</exclude> > > <exclude>de.javakaffee:kryo-serializers</exclude> > > <exclude>joda-time:joda-time</exclude> > > <exclude>org.apache.commons:commons-lang3</exclude> > > <exclude>org.slf4j:slf4j-api</exclude> > > <exclude>org.slf4j:slf4j-log4j12</exclude> > > <exclude>log4j:log4j</exclude> > > <exclude>org.apache.commons:commons-math</exclude> > > <exclude>org.apache.sling:org.apache.sling.commons.json</exclude> > > <exclude>commons-logging:commons-logging</exclude> > > <exclude>commons-codec:commons-codec</exclude> > > <exclude>com.fasterxml.jackson.core:jackson-core</exclude> > > <exclude>com.fasterxml.jackson.core:jackson-databind</exclude> > > <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude> > > <exclude>stax:stax-api</exclude> > > <exclude>com.typesafe:config</exclude> > > <exclude>org.uncommons.maths:uncommons-maths</exclude> > > <exclude>com.github.scopt:scopt_*</exclude> > > <exclude>commons-io:commons-io</exclude> > > <exclude>commons-cli:commons-cli</exclude> > </excludes> > </artifactSet> > <filters> > <filter> > > <artifact>org.apache.flink:*</artifact> > > <excludes> > > <!-- exclude shaded google but include shaded curator --> > > <exclude>org/apache/flink/shaded/com/**</exclude> > > <exclude>web-docs/**</exclude> > > </excludes> > </filter> > <filter> > <!-- Do > not copy the signatures in the META-INF folder. > > Otherwise, this might cause SecurityExceptions when using the JAR. --> > > <artifact>*:*</artifact> > > <excludes> > > <exclude>META-INF/*.SF</exclude> > > <exclude>META-INF/*.DSA</exclude> > > <exclude>META-INF/*.RSA</exclude> > > </excludes> > </filter> > </filters> > <transformers> > <!-- add > Main-Class to manifest file --> > <transformer > implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> > > <mainClass>com.gf.flink.Job</mainClass> > </transformer> > </transformers> > > <createDependencyReducedPom>false</createDependencyReducedPom> > </configuration> > </execution> > </executions> > </plugin> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-compiler-plugin</artifactId> > <version>3.1</version> > <configuration> > <source>1.7</source> <!-- If you want > to use Java 8, change this to "1.8" --> > <target>1.7</target> <!-- If you want > to use Java 8, change this to "1.8" --> > </configuration> > </plugin> > </plugins> > > > <!-- If you want to use Java 8 Lambda Expressions uncomment the > following lines --> > <!-- > <pluginManagement> > <plugins> > <plugin> > > <artifactId>maven-compiler-plugin</artifactId> > <configuration> > <source>1.8</source> > <target>1.8</target> > <compilerId>jdt</compilerId> > </configuration> > <dependencies> > <dependency> > > <groupId>org.eclipse.tycho</groupId> > > <artifactId>tycho-compiler-jdt</artifactId> > > <version>0.21.0</version> > </dependency> > </dependencies> > </plugin> > > <plugin> > <groupId>org.eclipse.m2e</groupId> > > <artifactId>lifecycle-mapping</artifactId> > <version>1.0.0</version> > <configuration> > <lifecycleMappingMetadata> > <pluginExecutions> > > <pluginExecution> > > <pluginExecutionFilter> > > <groupId>org.apache.maven.plugins</groupId> > > <artifactId>maven-assembly-plugin</artifactId> > > <versionRange>[2.4,)</versionRange> > > <goals> > > <goal>single</goal> > > </goals> > > </pluginExecutionFilter> > <action> > > <ignore/> > > </action> > > </pluginExecution> > > <pluginExecution> > > <pluginExecutionFilter> > > <groupId>org.apache.maven.plugins</groupId> > > <artifactId>maven-compiler-plugin</artifactId> > > <versionRange>[3.1,)</versionRange> > > <goals> > > <goal>testCompile</goal> > > <goal>compile</goal> > > </goals> > > </pluginExecutionFilter> > <action> > > <ignore/> > > </action> > > </pluginExecution> > </pluginExecutions> > </lifecycleMappingMetadata> > </configuration> > </plugin> > </plugins> > </pluginManagement> > --> > > </build> > </project> -- This message was sent by Atlassian JIRA (v6.3.4#6332)