[GitHub] flink issue #4883: [FLINK-4809] Operators should tolerate checkpoint failure...
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/4883 @StefanRRichter do we have any update on this PR? ---
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @zentol Hey zentol, when do you think we will be able to merge in. ---
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @zentol thx, fixed. ---
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @zentol @fhueske Can you take another look. ---
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @zentol Do you mind taking another look. ---
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @fhueske Hi zentol hasn't replied can you help to take a look. thx ---
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @zentol and do you know why import com.google.common.util.concurrent.ListenableFuture becomes illegal import? The checkstyle error is blocking CI. However this class is brought in from base class. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user PangZhi commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r143098834 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, injectTableName(INSERT_DATA_QUERY), new Properties()); + CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); + + newCassandrTableSink.emitDataStream(source); + + env.execute(); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraTableSinkE2E() throws Exception { --- End diff -- this is added as another reviewer think better to add e2e test using sql api. ---
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @zentol can you take another look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user PangZhi commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113505564 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); --- End diff -- @zentol There is another Row class used here. So need to use qualified name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
GitHub user PangZhi opened a pull request: https://github.com/apache/flink/pull/3748 [FLINK-6225] [Cassandra Connector] add CassandraTableSink This PR is to address https://issues.apache.org/jira/browse/FLINK-6225 - add Row type support for CassandraSink - add StreamTableSink implementation for Cassandra You can merge this pull request into a Git repository by running: $ git pull https://github.com/PangZhi/flink FLINK-6225_add_cassandra_table_sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3748 commit 8bd2816a5d5999575bd98b5d3096af5f36f6f173 Author: Jing Fan <jing@uber.com> Date: 2017-04-21T00:54:36Z [FLINK-6225] [Cassandra Connector] add CassandraTableSink --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---