[jira] [Resolved] (FLINK-1946) Make yarn tests logging less verbose
[ https://issues.apache.org/jira/browse/FLINK-1946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-1946. Resolution: Fixed This is resolved with 5b2ad7f03ef67ce529551e7b464d7db94e2a1d90. Closing this. > Make yarn tests logging less verbose > > > Key: FLINK-1946 > URL: https://issues.apache.org/jira/browse/FLINK-1946 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: Till Rohrmann >Priority: Minor > > Currently, the yarn tests log on the INFO level making the test outputs > confusing. Furthermore some status messages are written to stdout. I think > these messages are not necessary to be shown to the user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4837) flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/FLINK-4837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-4837. -- Resolution: Won't Fix Assignee: Subhobrata Dey > flink-streaming-akka source connector > - > > Key: FLINK-4837 > URL: https://issues.apache.org/jira/browse/FLINK-4837 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Subhobrata Dey >Assignee: Subhobrata Dey > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array & data with timestamp. > 2. It can send back acknowledgements to the feeder actor. > Thanks & regards, > Subhobrata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4837) flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/FLINK-4837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839390#comment-15839390 ] Tzu-Li (Gordon) Tai commented on FLINK-4837: Seems like this JIRA can be closed, as the connector is moved to Bahir. > flink-streaming-akka source connector > - > > Key: FLINK-4837 > URL: https://issues.apache.org/jira/browse/FLINK-4837 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Subhobrata Dey > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array & data with timestamp. > 2. It can send back acknowledgements to the feeder actor. > Thanks & regards, > Subhobrata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5650) flink-python unit test costs more than half hour
shijinkui created FLINK-5650: Summary: flink-python unit test costs more than half hour Key: FLINK-5650 URL: https://issues.apache.org/jira/browse/FLINK-5650 Project: Flink Issue Type: Bug Components: Python API Reporter: shijinkui When execute `mvn clean test` in flink-python, it will wait more than half hour after the console output below: --- T E S T S --- Running org.apache.flink.python.api.PythonPlanBinderTest log4j:WARN No appenders could be found for logger (org.apache.flink.python.api.PythonPlanBinderTest). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. The stack below: "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition [0x79fd8000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) at org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) at org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) at org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) at org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) at org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) at org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4463) FLIP-3: Restructure Documentation
[ https://issues.apache.org/jira/browse/FLINK-4463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839354#comment-15839354 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4463 at 1/26/17 7:12 AM: - With the recent efforts in adding documentation for the 1.2 release, I think some of the subtasks here are actually already completed by other duplicate JIRAs. was (Author: tzulitai): I think some of the subtasks here are actually already completed by other duplicate JIRAs, with the recent efforts in adding documentation for the 1.2 release. > FLIP-3: Restructure Documentation > - > > Key: FLINK-4463 > URL: https://issues.apache.org/jira/browse/FLINK-4463 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi > > Super issue to track progress for > [FLIP-3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-3+-+Organization+of+Documentation]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4463) FLIP-3: Restructure Documentation
[ https://issues.apache.org/jira/browse/FLINK-4463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839354#comment-15839354 ] Tzu-Li (Gordon) Tai commented on FLINK-4463: I think some of the subtasks here are actually already completed by other duplicate JIRAs, with the recent efforts in adding documentation for the 1.2 release. > FLIP-3: Restructure Documentation > - > > Key: FLINK-4463 > URL: https://issues.apache.org/jira/browse/FLINK-4463 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi > > Super issue to track progress for > [FLIP-3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-3+-+Organization+of+Documentation]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-4473) Add docs about end-to-end exactly once
[ https://issues.apache.org/jira/browse/FLINK-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4473: --- Comment: was deleted (was: I think some of the subtasks here are actually already completed by other duplicate JIRAs, with the recent efforts in adding documentation for the 1.2 release.) > Add docs about end-to-end exactly once > -- > > Key: FLINK-4473 > URL: https://issues.apache.org/jira/browse/FLINK-4473 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Ufuk Celebi > > Add page about end-to-end-exactly once processing semantics in relation to > connectors and processing semantics in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4473) Add docs about end-to-end exactly once
[ https://issues.apache.org/jira/browse/FLINK-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839352#comment-15839352 ] Tzu-Li (Gordon) Tai commented on FLINK-4473: I think some of the subtasks here are actually already completed by other duplicate JIRAs, with the recent efforts in adding documentation for the 1.2 release. > Add docs about end-to-end exactly once > -- > > Key: FLINK-4473 > URL: https://issues.apache.org/jira/browse/FLINK-4473 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Ufuk Celebi > > Add page about end-to-end-exactly once processing semantics in relation to > connectors and processing semantics in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results
[ https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839336#comment-15839336 ] ASF GitHub Bot commented on FLINK-1297: --- Github user coveralls commented on the issue: https://github.com/apache/flink/pull/605 [![Coverage Status](https://coveralls.io/builds/9850383/badge)](https://coveralls.io/builds/9850383) Changes Unknown when pulling **342f5c99cc530ccf2a6281223d1f4f917f1fb497 on tammymendt:FLINK-1297-v2** into ** on apache:master**. > Add support for tracking statistics of intermediate results > --- > > Key: FLINK-1297 > URL: https://issues.apache.org/jira/browse/FLINK-1297 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Alexander Alexandrov >Assignee: Tamara > Fix For: 0.10.0 > > Original Estimate: 1,008h > Remaining Estimate: 1,008h > > One of the major problems related to the optimizer at the moment is the lack > of proper statistics. > With the introduction of staged execution, it is possible to instrument the > runtime code with a statistics facility that collects the required > information for optimizing the next execution stage. > I would therefore like to contribute code that can be used to gather basic > statistics for the (intermediate) result of dataflows (e.g. min, max, count, > count distinct) and make them available to the job manager. > Before I start, I would like to hear some feedback form the other users. > In particular, to handle skew (e.g. on grouping) it might be good to have > some sort of detailed sketch about the key distribution of an intermediate > result. I am not sure whether a simple histogram is the most effective way to > go. Maybe somebody would propose another lightweight sketch that provides > better accuracy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #605: [FLINK-1297] Added OperatorStatsAccumulator for tracking o...
Github user coveralls commented on the issue: https://github.com/apache/flink/pull/605 [![Coverage Status](https://coveralls.io/builds/9850383/badge)](https://coveralls.io/builds/9850383) Changes Unknown when pulling **342f5c99cc530ccf2a6281223d1f4f917f1fb497 on tammymendt:FLINK-1297-v2** into ** on apache:master**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839328#comment-15839328 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Fixed all the minor comments given above. @tonycox , @wuchong , @fhueske . > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Fixed all the minor comments given above. @tonycox , @wuchong , @fhueske . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host
[ https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839322#comment-15839322 ] ASF GitHub Bot commented on FLINK-3857: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1962 Thank you for picking this up again @sbcd90. I would wait until #3112 is merged before rebasing. > Add reconnect attempt to Elasticsearch host > --- > > Key: FLINK-3857 > URL: https://issues.apache.org/jira/browse/FLINK-3857 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.1.0, 1.0.2 >Reporter: Fabian Hueske >Assignee: Subhobrata Dey > > Currently, the connection to the Elasticsearch host is opened in > {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a > changed DNS entry), the sink fails. > I propose to catch the Exception for lost connections in the {{invoke()}} > method and try to re-open the connection for a configurable number of times > with a certain delay. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1962 Thank you for picking this up again @sbcd90. I would wait until #3112 is merged before rebasing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935636 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { --- End diff -- Excessive empty line above this line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97757847 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -101,7 +101,7 @@ * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValuepunctuatedWatermarkAssigner; - private transient ListState > offsetsStateForCheckpoint; + private transient ListState >> offsetsAndWatermarksStateForCheckpoint; --- End diff -- I think we should switch to have a specific checkpointed state object instead of continuing to "extend" the original Tuple. This will also be helpful for compatibility for any future changes to the checkpointed state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839321#comment-15839321 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935971 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java --- @@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() { return partitionWatermark; } + void setCurrentWatermarkTimestamp(long watermarkTimestamp) { --- End diff -- The other methods seem to be `public` (although they can actually be package-private). Should we stay consistent with that here? > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839317#comment-15839317 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935696 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), Long.MIN_VALUE)); + } + + return state; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentWatermarkTimestamp())); + } + + return state; + } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentPartitionWatermark())); + } + + return state; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); --- End diff -- Would be good to have a reason message here. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839320#comment-15839320 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97758477 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { LOG.debug("snapshotState() called on closed source"); } else { - offsetsStateForCheckpoint.clear(); + offsetsAndWatermarksStateForCheckpoint.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the - // originally restored offsets or the assigned partitions + // originally restored offsets and watermarks or the assigned partitions - if (restoreToOffset != null) { + if (restoreToOffsetAndWatermark != null) { - for (Map.EntrykafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { - offsetsStateForCheckpoint.add( - Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); + for (Map.Entry > kafkaTopicPartitionOffsetAndWatermark : restoreToOffsetAndWatermark.entrySet()) { + offsetsAndWatermarksStateForCheckpoint.add( + Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue())); --- End diff -- Having a specific checkpoint state object will also be helpful for code readability in situations like this one (it's quite tricky to understand quickly what the key / value refers to, as well as some of the `f0`, `f1` calls in other parts of the PR. I know the previous code used `f0` and `f1` also, but I think it's a good opportunity to improve that). > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839316#comment-15839316 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935720 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), Long.MIN_VALUE)); + } + + return state; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentWatermarkTimestamp())); + } + + return state; + } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentPartitionWatermark())); + } + + return state; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); } - return state; } /** -* Restores the partition offsets. +* Restores the partition offsets and watermarks. * -* @param snapshotState The offsets for the partitions +* @param snapshotState The offsets and watermarks for the partitions */ - public void restoreOffsets(Map snapshotState) { - for (KafkaTopicPartitionState partition : allPartitions) { - Long offset = snapshotState.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setOffset(offset); + public void restoreOffsetsAndWatermarks(Map > snapshotState) { + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + for (KafkaTopicPartitionState partition : allPartitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { +
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839318#comment-15839318 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97757847 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -101,7 +101,7 @@ * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValuepunctuatedWatermarkAssigner; - private transient ListState > offsetsStateForCheckpoint; + private transient ListState >> offsetsAndWatermarksStateForCheckpoint; --- End diff -- I think we should switch to have a specific checkpointed state object instead of continuing to "extend" the original Tuple. This will also be helpful for compatibility for any future changes to the checkpointed state. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839319#comment-15839319 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935636 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { --- End diff -- Excessive empty line above this line. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935971 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java --- @@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() { return partitionWatermark; } + void setCurrentWatermarkTimestamp(long watermarkTimestamp) { --- End diff -- The other methods seem to be `public` (although they can actually be package-private). Should we stay consistent with that here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935696 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), Long.MIN_VALUE)); + } + + return state; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentWatermarkTimestamp())); + } + + return state; + } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentPartitionWatermark())); + } + + return state; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); --- End diff -- Would be good to have a reason message here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97758477 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { LOG.debug("snapshotState() called on closed source"); } else { - offsetsStateForCheckpoint.clear(); + offsetsAndWatermarksStateForCheckpoint.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the - // originally restored offsets or the assigned partitions + // originally restored offsets and watermarks or the assigned partitions - if (restoreToOffset != null) { + if (restoreToOffsetAndWatermark != null) { - for (Map.EntrykafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { - offsetsStateForCheckpoint.add( - Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); + for (Map.Entry > kafkaTopicPartitionOffsetAndWatermark : restoreToOffsetAndWatermark.entrySet()) { + offsetsAndWatermarksStateForCheckpoint.add( + Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue())); --- End diff -- Having a specific checkpoint state object will also be helpful for code readability in situations like this one (it's quite tricky to understand quickly what the key / value refers to, as well as some of the `f0`, `f1` calls in other parts of the PR. I know the previous code used `f0` and `f1` also, but I think it's a good opportunity to improve that). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935720 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), Long.MIN_VALUE)); + } + + return state; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentWatermarkTimestamp())); + } + + return state; + } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentPartitionWatermark())); + } + + return state; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); } - return state; } /** -* Restores the partition offsets. +* Restores the partition offsets and watermarks. * -* @param snapshotState The offsets for the partitions +* @param snapshotState The offsets and watermarks for the partitions */ - public void restoreOffsets(Map snapshotState) { - for (KafkaTopicPartitionState partition : allPartitions) { - Long offset = snapshotState.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setOffset(offset); + public void restoreOffsetsAndWatermarks(Map > snapshotState) { + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + for (KafkaTopicPartitionState partition : allPartitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { + partition.setOffset(offset); + } + } + break; + } + + case PERIODIC_WATERMARKS: {
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839312#comment-15839312 ] ASF GitHub Bot commented on FLINK-2168: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97936002 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String +
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97936002 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19992l)); + puts.add(put); + + put = new Put(ROW_3); + // add 3
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839289#comment-15839289 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934510 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String +
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934537 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19992l)); + puts.add(put); + + put = new Put(ROW_3); + // add
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839290#comment-15839290 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934537 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String +
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934510 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19992l)); + puts.add(put); + + put = new Put(ROW_3); + // add
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839288#comment-15839288 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934502 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); --- End diff -- ya . My bad. Will remove. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839287#comment-15839287 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934495 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' + * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name + */ +// TODO : Implement ProjectableTableSource? +public class HBaseTableSource implements BatchTableSource { --- End diff -- Am not sure.. For now I think we will implement BatchTableSource only and later implement StreamTableSource? Is there any significant design expectation for a source to be StreamTableSource? > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934502 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); --- End diff -- ya . My bad. Will remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934495 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' + * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name + */ +// TODO : Implement ProjectableTableSource? +public class HBaseTableSource implements BatchTableSource { --- End diff -- Am not sure.. For now I think we will implement BatchTableSource only and later implement StreamTableSource? Is there any significant design expectation for a source to be StreamTableSource? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839285#comment-15839285 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934446 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, byte[].class + }; + /** +* Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table +* +* @param familythe family name +* @param qualifier the qualifier name +* @param clazz the data type of the qualifier +*/ + public void addColumn(String family, String qualifier, Class clazz) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); --- End diff -- Good catch. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839283#comment-15839283 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934406 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); --- End diff -- Ok. In our other projects we used to qualify the generic on both the sides. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934428 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql --- End diff -- Ok. Makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839284#comment-15839284 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934428 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql --- End diff -- Ok. Makes sense. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934446 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, byte[].class + }; + /** +* Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table +* +* @param familythe family name +* @param qualifier the qualifier name +* @param clazz the data type of the qualifier +*/ + public void addColumn(String family, String qualifier, Class clazz) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); --- End diff -- Good catch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934406 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); --- End diff -- Ok. In our other projects we used to qualify the generic on both the sides. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97927186 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19992l)); + puts.add(put); + + put = new Put(ROW_3); + // add 3
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839127#comment-15839127 ] ASF GitHub Bot commented on FLINK-2168: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97927186 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String +
[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...
Github user sbcd90 commented on the issue: https://github.com/apache/flink/pull/1962 Hi @tzulitai , thanks for the updates. I'll refactor the code & will rebase the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host
[ https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839115#comment-15839115 ] ASF GitHub Bot commented on FLINK-3857: --- Github user sbcd90 commented on the issue: https://github.com/apache/flink/pull/1962 Hi @tzulitai , thanks for the updates. I'll refactor the code & will rebase the PR. > Add reconnect attempt to Elasticsearch host > --- > > Key: FLINK-3857 > URL: https://issues.apache.org/jira/browse/FLINK-3857 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.1.0, 1.0.2 >Reporter: Fabian Hueske >Assignee: Subhobrata Dey > > Currently, the connection to the Elasticsearch host is opened in > {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a > changed DNS entry), the sink fails. > I propose to catch the Exception for lost connections in the {{invoke()}} > method and try to re-open the connection for a configurable number of times > with a certain delay. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839107#comment-15839107 ] ASF GitHub Bot commented on FLINK-5546: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 " /tmp/cacheFile (Permission denied)" exception of unit test can be replay. 1. on linux env, `sudo - userA`, clone the flink code, start to `mvn clean test verify` 2. on the linux machine, `sudo - userB`, clone the flink codebase, same start to `mvn clean test verify` Because two use share the same `/tmp` directory which have no random sub directory. IMO, `/tmp` change to `${project.build.directory}`, at same time have random suffix. This can resolve the problem thoroughly. > java.io.tmpdir setted as project build directory in surefire plugin > --- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Test >Affects Versions: 1.2.0, 1.3.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 " /tmp/cacheFile (Permission denied)" exception of unit test can be replay. 1. on linux env, `sudo - userA`, clone the flink code, start to `mvn clean test verify` 2. on the linux machine, `sudo - userB`, clone the flink codebase, same start to `mvn clean test verify` Because two use share the same `/tmp` directory which have no random sub directory. IMO, `/tmp` change to `${project.build.directory}`, at same time have random suffix. This can resolve the problem thoroughly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools
[ https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839039#comment-15839039 ] ASF GitHub Bot commented on FLINK-5635: --- Github user jgrier commented on the issue: https://github.com/apache/flink/pull/3205 Thanks @ex00. I missed that. Actually there is a new Flink on Docker stub in the top-level docs and I was planning to document these scripts there, however we may want to keep this here and just link to it. Anyway, I will update the docs. Thanks. > Improve Docker tooling to make it easier to build images and launch Flink via > Docker tools > -- > > Key: FLINK-5635 > URL: https://issues.apache.org/jira/browse/FLINK-5635 > Project: Flink > Issue Type: Improvement > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > Fix For: 1.2.0 > > > This is a bit of a catch-all ticket for general improvements to the Flink on > Docker experience. > Things to improve: > - Make it possible to build a Docker image from your own flink-dist > directory as well as official releases. > - Make it possible to override the image name so a user can more easily > publish these images to their Docker repository > - Provide scripts that show how to properly run on Docker Swarm or similar > environments with overlay networking (Kubernetes) without using host > networking. > - Log to stdout rather than to files. > - Work properly with docker-compose for local deployment as well as > production deployments (Swarm/k8s) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3205: [FLINK-5635] Improve Docker tooling to make it easier to ...
Github user jgrier commented on the issue: https://github.com/apache/flink/pull/3205 Thanks @ex00. I missed that. Actually there is a new Flink on Docker stub in the top-level docs and I was planning to document these scripts there, however we may want to keep this here and just link to it. Anyway, I will update the docs. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools
[ https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839036#comment-15839036 ] Jamie Grier commented on FLINK-5635: [~greghogan] I actually wasn't aware of [FLINK-4326]. However there does seem to be some overlap. There is also [FLINK-5634] and an associated PR which specifically addresses enabling logging to stdout rather than to a file. However, neither of these two issues addresses running Flink processes in the foreground and avoiding forking as in [FLINK-4326]. I think this remains a separate issue that may need to be addressed. My main motivation in both of these JIRA issues and associated PRs was in providing a better Flink on Docker experience and providing some example scripts of how to run Flink properly in container-based environments. I would also like to get some "official" Flink Docker images published once we're satisfied with them. > Improve Docker tooling to make it easier to build images and launch Flink via > Docker tools > -- > > Key: FLINK-5635 > URL: https://issues.apache.org/jira/browse/FLINK-5635 > Project: Flink > Issue Type: Improvement > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > Fix For: 1.2.0 > > > This is a bit of a catch-all ticket for general improvements to the Flink on > Docker experience. > Things to improve: > - Make it possible to build a Docker image from your own flink-dist > directory as well as official releases. > - Make it possible to override the image name so a user can more easily > publish these images to their Docker repository > - Provide scripts that show how to properly run on Docker Swarm or similar > environments with overlay networking (Kubernetes) without using host > networking. > - Log to stdout rather than to files. > - Work properly with docker-compose for local deployment as well as > production deployments (Swarm/k8s) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5634) Flink should not always redirect stdout to a file.
[ https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier reassigned FLINK-5634: -- Assignee: Jamie Grier > Flink should not always redirect stdout to a file. > -- > > Key: FLINK-5634 > URL: https://issues.apache.org/jira/browse/FLINK-5634 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > Fix For: 1.2.0 > > > Flink always redirects stdout to a file. While often convenient this isn't > always what people want. The most obvious case of this is a Docker > deployment. > It should be possible to have Flink log to stdout. > Here is a PR for this: https://github.com/apache/flink/pull/3204 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools
[ https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier reassigned FLINK-5635: -- Assignee: Jamie Grier > Improve Docker tooling to make it easier to build images and launch Flink via > Docker tools > -- > > Key: FLINK-5635 > URL: https://issues.apache.org/jira/browse/FLINK-5635 > Project: Flink > Issue Type: Improvement > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > Fix For: 1.2.0 > > > This is a bit of a catch-all ticket for general improvements to the Flink on > Docker experience. > Things to improve: > - Make it possible to build a Docker image from your own flink-dist > directory as well as official releases. > - Make it possible to override the image name so a user can more easily > publish these images to their Docker repository > - Provide scripts that show how to properly run on Docker Swarm or similar > environments with overlay networking (Kubernetes) without using host > networking. > - Log to stdout rather than to files. > - Work properly with docker-compose for local deployment as well as > production deployments (Swarm/k8s) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5640) configure the explicit Unit Test file suffix
[ https://issues.apache.org/jira/browse/FLINK-5640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5640: - Affects Version/s: 1.2.1 > configure the explicit Unit Test file suffix > > > Key: FLINK-5640 > URL: https://issues.apache.org/jira/browse/FLINK-5640 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.2.1 >Reporter: shijinkui >Assignee: shijinkui > > There are four types of Unit Test file: *ITCase.java, *Test.java, > *ITSuite.scala, *Suite.scala > File name ending with "IT.java" is integration test. File name ending with > "Test.java" is unit test. > It's clear for Surefire plugin of default-test execution to declare that > "*Test.*" is Java Unit Test. > The test file statistics below: > * Suite total: 10 > * ITCase total: 378 > * Test total: 1008 > * ITSuite total: 14 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5649) KryoException when starting job from Flink 1.2.0 rc0 savepoint
[ https://issues.apache.org/jira/browse/FLINK-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Scott Kidder updated FLINK-5649: Description: I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and encountered the following error on Flink 1.2.0 rc2, leading to the job being cancelled: {noformat} java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 92 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277) ... 6 more {noformat} Submitting the same job without a savepoint works fine (except that there's no state, of course). Might be related to FLINK-5484 pull-request https://github.com/apache/flink/pull/3152 was: I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and encountered the following error, leading to the job being cancelled: {noformat} java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 92 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277) ... 6 more {noformat} Submitting the same job without a savepoint works fine (except that there's no state, of course). Might be related to FLINK-5484 pull-request
[jira] [Created] (FLINK-5649) KryoException when starting job from Flink 1.2.0 rc0 savepoint
Scott Kidder created FLINK-5649: --- Summary: KryoException when starting job from Flink 1.2.0 rc0 savepoint Key: FLINK-5649 URL: https://issues.apache.org/jira/browse/FLINK-5649 Project: Flink Issue Type: Bug Affects Versions: 1.2.0 Reporter: Scott Kidder I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and encountered the following error, leading to the job being cancelled: {noformat} java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 92 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277) ... 6 more {noformat} Submitting the same job without a savepoint works fine (except that there's no state, of course). Might be related to FLINK-5484 pull-request https://github.com/apache/flink/pull/3152 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI
[ https://issues.apache.org/jira/browse/FLINK-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838860#comment-15838860 ] Scott Kidder commented on FLINK-5648: - May have been introduced in FLINK-5382 pull-request https://github.com/apache/flink/pull/3055 > Task Manager ID missing from logs link in Job Manager UI > > > Key: FLINK-5648 > URL: https://issues.apache.org/jira/browse/FLINK-5648 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > This appears to be a regression introduced in Flink 1.2.0 rc2. I've been > using 1.2.0 rc0 without this issue being present. > The link from the Job Manager to download logs for a specific Task Manager > instance does not include the ID of the Task Manager. The following > screenshot shows the link: > !http://imgur.com/dLhxALT.png! > The following exception appears in the Job Manager logs after trying to > retrieve the logs for a Task Manager (without the ID of the Task Manager > given): > {noformat} > 2017-01-25 23:34:44,915 ERROR > org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler - > Fetching TaskManager log failed. > java.lang.IllegalArgumentException: Argument bytes must by an array of 16 > bytes > at org.apache.flink.util.AbstractID.(AbstractID.java:63) > at > org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33) > at > org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170) > at > org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90) > at > org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62) > at > io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57) > at > io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105) > at > org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
[jira] [Updated] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI
[ https://issues.apache.org/jira/browse/FLINK-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Scott Kidder updated FLINK-5648: Description: This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 1.2.0 rc0 without this issue being present. The link from the Job Manager to download logs for a specific Task Manager instance does not include the ID of the Task Manager. The following screenshot shows the link: !http://imgur.com/dLhxALT.png! The following exception appears in the Job Manager logs after trying to retrieve the logs for a Task Manager (without the ID of the Task Manager given): {noformat} 2017-01-25 23:34:44,915 ERROR org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler - Fetching TaskManager log failed. java.lang.IllegalArgumentException: Argument bytes must by an array of 16 bytes at org.apache.flink.util.AbstractID.(AbstractID.java:63) at org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33) at org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170) at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90) at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62) at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57) at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105) at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) {noformat} was: This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 1.2.0 rc0 without this issue being present. The link from the Job Manager to download logs for a specific Task Manager instance does not include the ID of the Task Manager. The following screenshot shows the link: !http://imgur.com/dLhxALT.png! The following exception appears in the Job Manager logs after trying to
[jira] [Created] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI
Scott Kidder created FLINK-5648: --- Summary: Task Manager ID missing from logs link in Job Manager UI Key: FLINK-5648 URL: https://issues.apache.org/jira/browse/FLINK-5648 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.2.0 Reporter: Scott Kidder This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 1.2.0 rc0 without this issue being present. The link from the Job Manager to download logs for a specific Task Manager instance does not include the ID of the Task Manager. The following screenshot shows the link: !http://imgur.com/dLhxALT.png! The following exception appears in the Job Manager logs after trying to retrieve the logs for a Task Manager (without the ID of the Task Manager given): {{ 2017-01-25 23:34:44,915 ERROR org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler - Fetching TaskManager log failed. java.lang.IllegalArgumentException: Argument bytes must by an array of 16 bytes at org.apache.flink.util.AbstractID.(AbstractID.java:63) at org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33) at org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170) at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90) at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62) at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57) at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105) at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) }} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3189: [FLINK-5608] [webfrontend] Cancel button stays visible in...
Github user rehevkor5 commented on the issue: https://github.com/apache/flink/pull/3189 Thanks @uce that's helpful! Didn't think of putting screenshots in the PR. @zentol The global job state counts didn't seem like the most important thing to show when the browser is narrow, given that this view is about a particular job. So, I added these CSS classes to it ".hidden-xs .hidden-sm .hidden-md" the behavior of which is described on http://getbootstrap.com/css/#responsive-utilities I am not sure what width @uce used for the "wide" screenshots, but it probably was less than 1200px, otherwise it would show up. I can remove the ".hidden-md" class if you want it to show up in the 992px to 1200px range, too... however it does make it more likely for the other more job-specific elements (start/end, duration) to wrap & get lost. Perhaps it would be a good idea to reorder the elements? Put start/end & duration first, and overall job stats last so that it's the first one to wrap? Let me know if you have a preference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5608) Cancel button not always visible
[ https://issues.apache.org/jira/browse/FLINK-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838835#comment-15838835 ] ASF GitHub Bot commented on FLINK-5608: --- Github user rehevkor5 commented on the issue: https://github.com/apache/flink/pull/3189 Thanks @uce that's helpful! Didn't think of putting screenshots in the PR. @zentol The global job state counts didn't seem like the most important thing to show when the browser is narrow, given that this view is about a particular job. So, I added these CSS classes to it ".hidden-xs .hidden-sm .hidden-md" the behavior of which is described on http://getbootstrap.com/css/#responsive-utilities I am not sure what width @uce used for the "wide" screenshots, but it probably was less than 1200px, otherwise it would show up. I can remove the ".hidden-md" class if you want it to show up in the 992px to 1200px range, too... however it does make it more likely for the other more job-specific elements (start/end, duration) to wrap & get lost. Perhaps it would be a good idea to reorder the elements? Put start/end & duration first, and overall job stats last so that it's the first one to wrap? Let me know if you have a preference. > Cancel button not always visible > > > Key: FLINK-5608 > URL: https://issues.apache.org/jira/browse/FLINK-5608 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.1.4 >Reporter: Shannon Carey >Assignee: Shannon Carey >Priority: Minor > > When the window is not wide enough, or when the job name is too long, the > "Cancel" button in the Job view of the web UI is not visible because it is > the first element that gets wrapped down and gets covered by the secondary > navbar (the tabs). This causes us to often need to resize the browser wider > than our monitor in order to use the cancel button. > In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the > content may wrap, especially if the content's horizontal width if not known & > fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any > unexpected change in height will result in overlap with the rest of the > normal-flow content in the page. The Bootstrap docs explain this in their > "Overflowing content" callout. > I am submitting a PR which does not attempt to resolve all issues with the > fixed navbar approach, but attempts to improve the situation by using less > horizontal space and by altering the layout approach of the Cancel button. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97898531 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); --- End diff -- could you just put `<>` here instead of >>? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97903962 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19992l)); + puts.add(put); + + put = new Put(ROW_3); + // add 3
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838814#comment-15838814 ] ASF GitHub Bot commented on FLINK-2168: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97904966 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String +
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97897602 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, byte[].class + }; + /** +* Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table +* +* @param familythe family name +* @param qualifier the qualifier name +* @param clazz the data type of the qualifier +*/ + public void addColumn(String family, String qualifier, Class clazz) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); --- End diff -- must be qualifier --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97903708 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); --- End diff -- why public? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838813#comment-15838813 ] ASF GitHub Bot commented on FLINK-2168: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97900030 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql --- End diff -- I think, if it's used only in table api would be good java.sql.Date cause calcite use sql type of date > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838818#comment-15838818 ] ASF GitHub Bot commented on FLINK-2168: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97901774 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' + * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name + */ +// TODO : Implement ProjectableTableSource? +public class HBaseTableSource implements BatchTableSource { --- End diff -- will be `StreamTableSource` okay for HBase source? I think it can be implemented same as batch `getDataSet` > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838817#comment-15838817 ] ASF GitHub Bot commented on FLINK-2168: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97898531 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); --- End diff -- could you just put `<>` here instead of >>? > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838816#comment-15838816 ] ASF GitHub Bot commented on FLINK-2168: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97903708 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); --- End diff -- why public? > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838815#comment-15838815 ] ASF GitHub Bot commented on FLINK-2168: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97903962 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String +
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838819#comment-15838819 ] ASF GitHub Bot commented on FLINK-2168: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97897602 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, byte[].class + }; + /** +* Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table +* +* @param familythe family name +* @param qualifier the qualifier name +* @param clazz the data type of the qualifier +*/ + public void addColumn(String family, String qualifier, Class clazz) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); --- End diff -- must be qualifier > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97901774 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' + * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name + */ +// TODO : Implement ProjectableTableSource? +public class HBaseTableSource implements BatchTableSource { --- End diff -- will be `StreamTableSource` okay for HBase source? I think it can be implemented same as batch `getDataSet` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97904966 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunctionmapFunction = new MapFunction
() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19992l)); + puts.add(put); + + put = new Put(ROW_3); + // add 3
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97900030 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map>> familyMap = + new HashMap >>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql --- End diff -- I think, if it's used only in table api would be good java.sql.Date cause calcite use sql type of date --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5612) GlobPathFilter not-serializable exception
[ https://issues.apache.org/jira/browse/FLINK-5612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838707#comment-15838707 ] ASF GitHub Bot commented on FLINK-5612: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3206 Hi @StephanEwen, @zentol Thank you for your comments. I've update the PR accordingly. > GlobPathFilter not-serializable exception > - > > Key: FLINK-5612 > URL: https://issues.apache.org/jira/browse/FLINK-5612 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Assignee: Ivan Mushketyk >Priority: Blocker > > A user reported on the mailing list a non-serializable exception when using > the GlobFIlePathFilters. > It appears that the PathMatchers are all created as anonymous inner classes > and thus contain a reference to the encapsulating, non-serializable > FileSystem class. > We can fix this by moving the Matcher instantiation into filterPath(...). > {code} > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > final TextInputFormat format = new TextInputFormat(new Path("/temp")); > format.setFilesFilter(new GlobFilePathFilter( > Collections.singletonList("**"), > Arrays.asList("**/another_file.bin", "**/dataFile1.txt") > )); > DataSet result = env.readFile(format,"/tmp"); > result.writeAsText("/temp/out"); > env.execute("GlobFilePathFilter-Test"); > } > {code} > {code} > Exception in thread "main" org.apache.flink.optimizer.CompilerException: > Error translating node 'Data Source "at > readFile(ExecutionEnvironment.java:520) > (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties > [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, > grouped=null, unique=null] ]]': Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3 > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106) > at > org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192) > at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:188) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) > at com.apsaltis.EventDetectionJob.main(EventDetectionJob.java:75) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: > Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3 > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281) > ... 8 more > Caused by: java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3 > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.ArrayList.writeObject(ArrayList.java:747) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at >
[GitHub] flink issue #3206: [FLINK-5612] Fix GlobPathFilter not-serializable exceptio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3206 Hi @StephanEwen, @zentol Thank you for your comments. I've update the PR accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5647) Fix RocksDB Backend Cleanup
Aljoscha Krettek created FLINK-5647: --- Summary: Fix RocksDB Backend Cleanup Key: FLINK-5647 URL: https://issues.apache.org/jira/browse/FLINK-5647 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.1.4 Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek The RocksDB backend on Flink 1.1.x does not properly clean up the directories that it uses. This can lead to overflowing disks when a lot of failure/recovery cycles happen. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838472#comment-15838472 ] ASF GitHub Bot commented on FLINK-5630: --- Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/3207 > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838471#comment-15838471 ] ASF GitHub Bot commented on FLINK-5630: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3207 Manually merged to `master` in 1542260d52238e87de4fa040e6079465777e8263 > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3207: [FLINK-5630] [streaming api] Followups to the AggregateFu...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3207 Manually merged to `master` in 1542260d52238e87de4fa040e6079465777e8263 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/3207 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5632) Typo in StreamGraph
[ https://issues.apache.org/jira/browse/FLINK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5632. --- > Typo in StreamGraph > --- > > Key: FLINK-5632 > URL: https://issues.apache.org/jira/browse/FLINK-5632 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Trivial > Fix For: 1.3.0 > > > Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5630. --- > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5630) Followups to AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5630. - Resolution: Fixed Fixed in 1542260d52238e87de4fa040e6079465777e8263 > Followups to AggregationFunction > > > Key: FLINK-5630 > URL: https://issues.apache.org/jira/browse/FLINK-5630 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Various followup issues to the aggregation function, like > - Allowing different input/output types for the cases where an additional > window apply function is specified > - Adding the {{aggregate()}} methods to the Scala API > - Adding the window translation tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5632) Typo in StreamGraph
[ https://issues.apache.org/jira/browse/FLINK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838468#comment-15838468 ] ASF GitHub Bot commented on FLINK-5632: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3203 > Typo in StreamGraph > --- > > Key: FLINK-5632 > URL: https://issues.apache.org/jira/browse/FLINK-5632 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Trivial > Fix For: 1.3.0 > > > Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5632) Typo in StreamGraph
[ https://issues.apache.org/jira/browse/FLINK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5632. - Resolution: Fixed Fix Version/s: 1.3.0 Fixed via 6f5c7d8a68e3a1fa88902d8d314f350563b76752 > Typo in StreamGraph > --- > > Key: FLINK-5632 > URL: https://issues.apache.org/jira/browse/FLINK-5632 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Trivial > Fix For: 1.3.0 > > > Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3203: [FLINK-5632] [streaming] Typo in StreamGraph
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3203 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4912) Introduce RECONCILING state in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838452#comment-15838452 ] ASF GitHub Bot commented on FLINK-4912: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3113 Considering the possible state transitions: ## ExecutionState - `RECONCILING` can only be entered from `CREATED` Simple: - `RECONCILING` can go to `RUNNING` if the task was reconciled - `RECONCILING` can go to `FAILED` if the task was not reconciled Complex: - For `RECONCILING` to go to `FINISHED`, `CANCELED`, it would mean that the TaskManager that has the task would report (when registering at the JobManager) a task that is no longer executing. To do that, the TaskManager would need to "remember" tasks that completed and where it did not get an acknowledgement from the JobManager for the execution state update. Is that anticipated? ## JobStatus - `RECONCILING` can only be entered from `CREATED` Simple: - `RECONCILING` can go to `RUNNING` - if all TaskManagers report their status and tasks as running - `RECONCILING` can go to `FAILING` - if not all tasks were reported. Complex: - For reconciling to go to into `FINISHED`, we'd need that the `ExecutionState` can go to `FINISHED`. What do you think about only doing the "simple" option in the first version? > Introduce RECONCILING state in ExecutionGraph > - > > Key: FLINK-4912 > URL: https://issues.apache.org/jira/browse/FLINK-4912 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Zhijiang Wang > > This is part of the non-disruptive JobManager failure recovery. > I suggest to add a JobStatus and ExecutionState {{RECONCILING}}. > If a job is started on a that JobManager for master recovery (tbd how to > determine that) the {{ExecutionGraph}} and the {{Execution}}s start in the > reconciling state. > From {{RECONCILING}}, tasks can go to {{RUNNING}} (execution reconciled with > TaskManager) or to {{FAILED}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3113: [FLINK-4912] Introduce RECONCILIATING state in ExecutionG...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3113 Considering the possible state transitions: ## ExecutionState - `RECONCILING` can only be entered from `CREATED` Simple: - `RECONCILING` can go to `RUNNING` if the task was reconciled - `RECONCILING` can go to `FAILED` if the task was not reconciled Complex: - For `RECONCILING` to go to `FINISHED`, `CANCELED`, it would mean that the TaskManager that has the task would report (when registering at the JobManager) a task that is no longer executing. To do that, the TaskManager would need to "remember" tasks that completed and where it did not get an acknowledgement from the JobManager for the execution state update. Is that anticipated? ## JobStatus - `RECONCILING` can only be entered from `CREATED` Simple: - `RECONCILING` can go to `RUNNING` - if all TaskManagers report their status and tasks as running - `RECONCILING` can go to `FAILING` - if not all tasks were reported. Complex: - For reconciling to go to into `FINISHED`, we'd need that the `ExecutionState` can go to `FINISHED`. What do you think about only doing the "simple" option in the first version? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5092) Add integration with Sonarqube and code coverage
[ https://issues.apache.org/jira/browse/FLINK-5092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838448#comment-15838448 ] ASF GitHub Bot commented on FLINK-5092: --- Github user BorisOsipov commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r97863068 --- Diff: flink-contrib/flink-storm-examples/pom.xml --- @@ -364,6 +363,18 @@ under the License. + + shade-flink --- End diff -- The maven's philosophy is 'one artifact per project'. These projects don't follow this way and produces several jar's artifacts :) Shade plugin should produce flink-storm-streaming_2.10-1.2-SNAPSHOT-shaded.jar but it doesn't do it. It is a problem for maven install plugin that wants to have such jar by default. And it leads to failure on install phase without such configuration changes. For this reason I added such changes. So I propose don't collect test coverage for these projects (flink-storm and streaming examples) at all. What do you think? > Add integration with Sonarqube and code coverage > > > Key: FLINK-5092 > URL: https://issues.apache.org/jira/browse/FLINK-5092 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Boris Osipov >Assignee: Boris Osipov > > It would be good to have the opportunity to generate test coverage reports > for Flink and analyze code by SonarQube. > Parts of tasks: > - add generate test coverage reports for Flink with new maven profile > - implement integration with https://analysis.apache.org/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...
Github user BorisOsipov commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r97863068 --- Diff: flink-contrib/flink-storm-examples/pom.xml --- @@ -364,6 +363,18 @@ under the License. + + shade-flink --- End diff -- The maven's philosophy is 'one artifact per project'. These projects don't follow this way and produces several jar's artifacts :) Shade plugin should produce flink-storm-streaming_2.10-1.2-SNAPSHOT-shaded.jar but it doesn't do it. It is a problem for maven install plugin that wants to have such jar by default. And it leads to failure on install phase without such configuration changes. For this reason I added such changes. So I propose don't collect test coverage for these projects (flink-storm and streaming examples) at all. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5646) REST api documentation missing details on jar upload
Cliff Resnick created FLINK-5646: Summary: REST api documentation missing details on jar upload Key: FLINK-5646 URL: https://issues.apache.org/jira/browse/FLINK-5646 Project: Flink Issue Type: Bug Components: Documentation Reporter: Cliff Resnick Priority: Minor The 1.2 release documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/rest_api.html) states "It is possible to upload, run, and list Flink programs via the REST APIs and web frontend". However there is no documentation about uploading a jar via REST api. There should be something to the effect of: "You can upload a jar file using http post with the file data sent under a form field 'jarfile'." -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838406#comment-15838406 ] ASF GitHub Bot commented on FLINK-3318: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Hi @mushketyk ! Yes. In this case we would expect to have everything apart from the "end" event in the result, right? > Add support for quantifiers to CEP's pattern API > > > Key: FLINK-3318 > URL: https://issues.apache.org/jira/browse/FLINK-3318 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > It would be a good addition to extend the pattern API to support quantifiers > known from regular expressions (e.g. Kleene star, ?, +, or count bounds). > This would considerably enrich the set of supported patterns. > Implementing the count bounds could be done by unrolling the pattern state. > In order to support the Kleene star operator, the {{NFACompiler}} has to be > extended to insert epsilon-transition between a Kleene start state and the > succeeding pattern state. In order to support {{?}}, one could insert two > paths from the preceding state, one which accepts the event and another which > directly goes into the next pattern state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Hi @mushketyk ! Yes. In this case we would expect to have everything apart from the "end" event in the result, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3113: [FLINK-4912] Introduce RECONCILIATING state in ExecutionG...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3113 Will merge this. To make it proper robust, I will add some tests that validate the state transitions of the state machine... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4912) Introduce RECONCILING state in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838403#comment-15838403 ] ASF GitHub Bot commented on FLINK-4912: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3113 Will merge this. To make it proper robust, I will add some tests that validate the state transitions of the state machine... > Introduce RECONCILING state in ExecutionGraph > - > > Key: FLINK-4912 > URL: https://issues.apache.org/jira/browse/FLINK-4912 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Zhijiang Wang > > This is part of the non-disruptive JobManager failure recovery. > I suggest to add a JobStatus and ExecutionState {{RECONCILING}}. > If a job is started on a that JobManager for master recovery (tbd how to > determine that) the {{ExecutionGraph}} and the {{Execution}}s start in the > reconciling state. > From {{RECONCILING}}, tasks can go to {{RUNNING}} (execution reconciled with > TaskManager) or to {{FAILED}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #:
Github user mindprince commented on the pull request: https://github.com/apache/flink/commit/6342d6db1de5f38a921732e35abd83e6c5b9305a#commitcomment-20615018 In flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java: In flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java on line 130: I thought `allowedLateness` affected when we purged windows. Wouldn't this result in keeping processing time windows around for longer than we should? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5637) Default Flink configuration contains whitespace characters, causing parser WARNings
[ https://issues.apache.org/jira/browse/FLINK-5637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838325#comment-15838325 ] ASF GitHub Bot commented on FLINK-5637: --- Github user rmetzger closed the pull request at: https://github.com/apache/flink/pull/3216 > Default Flink configuration contains whitespace characters, causing parser > WARNings > --- > > Key: FLINK-5637 > URL: https://issues.apache.org/jira/browse/FLINK-5637 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > Labels: starter > Fix For: 1.2.0, 1.3.0 > > > {code} > 2017-01-25 09:45:30,670 WARN > org.apache.flink.configuration.GlobalConfiguration- Error while > trying to split key and value in configuration file > /yarn/nm/usercache/robert/appcache/application_1485249546281_0018/container_1485249546281_0018_01_01/flink-conf.yaml: > > {code} > The whitespace is currently in line 67: > {code} > #== > > # The address under which the web-based runtime monitor listens. > {code} > I think we should add a test to the {{GlobalConfigurationTest}} that ensures > the configuration file we are shipping doesn't produce any WARNings by > default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3216: [FLINK-5637] Avoid warning while parsing YAML conf...
Github user rmetzger closed the pull request at: https://github.com/apache/flink/pull/3216 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---